This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 580c111 KAFKA-12662: add unit test for ProducerPerformance (#10588)
580c111 is described below
commit 580c1112582ed6ddaca6c0402a1bb014c36235e5
Author: CHUN-HAO TANG <[email protected]>
AuthorDate: Thu Jun 17 20:07:12 2021 +0800
KAFKA-12662: add unit test for ProducerPerformance (#10588)
Reviewers: Luke Chen <[email protected]>, wenbingshen
<[email protected]>, dengziming <[email protected]>, Chia-Ping
Tsai <[email protected]>
---
build.gradle | 2 +-
gradle/dependencies.gradle | 1 +
.../apache/kafka/tools/ProducerPerformance.java | 110 +++++++++------
.../kafka/tools/ProducerPerformanceTest.java | 157 +++++++++++++++++++++
4 files changed, 227 insertions(+), 43 deletions(-)
diff --git a/build.gradle b/build.gradle
index e579e7a..11f0218 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1582,7 +1582,7 @@ project(':tools') {
testImplementation libs.junitJupiter
testImplementation project(':clients').sourceSets.test.output
testImplementation libs.mockitoInline // supports mocking static methods,
final classes, etc.
-
+ testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
testRuntimeOnly libs.slf4jlog4j
}
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 069d3f6..82d6798 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -173,6 +173,7 @@ libs += [
metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
mockitoCore: "org.mockito:mockito-core:$versions.mockito",
mockitoInline: "org.mockito:mockito-inline:$versions.mockito",
+ mockitoJunitJupiter: "org.mockito:mockito-junit-jupiter:$versions.mockito",
nettyHandler: "io.netty:netty-handler:$versions.netty",
nettyTransportNativeEpoll:
"io.netty:netty-transport-native-epoll:$versions.netty",
powermockJunit4: "org.powermock:powermock-module-junit4:$versions.powermock",
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index 0ddff32..1f47bbc 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -19,6 +19,7 @@ package org.apache.kafka.tools;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -46,6 +47,11 @@ import org.apache.kafka.common.utils.Utils;
public class ProducerPerformance {
public static void main(String[] args) throws Exception {
+ ProducerPerformance perf = new ProducerPerformance();
+ perf.start(args);
+ }
+
+ void start(String[] args) throws IOException {
ArgumentParser parser = argParser();
try {
@@ -71,41 +77,11 @@ public class ProducerPerformance {
throw new ArgumentParserException("Either --producer-props or
--producer.config must be specified.", parser);
}
- List<byte[]> payloadByteList = new ArrayList<>();
- if (payloadFilePath != null) {
- Path path = Paths.get(payloadFilePath);
- System.out.println("Reading payloads from: " +
path.toAbsolutePath());
- if (Files.notExists(path) || Files.size(path) == 0) {
- throw new IllegalArgumentException("File does not exist
or empty file provided.");
- }
-
- String[] payloadList = new String(Files.readAllBytes(path),
StandardCharsets.UTF_8).split(payloadDelimiter);
-
- System.out.println("Number of messages read: " +
payloadList.length);
+ List<byte[]> payloadByteList = readPayloadFile(payloadFilePath,
payloadDelimiter);
- for (String payload : payloadList) {
-
payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
- }
- }
+ Properties props = readProps(producerProps, producerConfig,
transactionalId, transactionsEnabled);
- Properties props = new Properties();
- if (producerConfig != null) {
- props.putAll(Utils.loadProps(producerConfig));
- }
- if (producerProps != null)
- for (String prop : producerProps) {
- String[] pieces = prop.split("=");
- if (pieces.length != 2)
- throw new IllegalArgumentException("Invalid property:
" + prop);
- props.put(pieces[0], pieces[1]);
- }
-
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
- if (transactionsEnabled)
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
transactionalId);
-
- KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(props);
+ KafkaProducer<byte[], byte[]> producer =
createKafkaProducer(props);
if (transactionsEnabled)
producer.initTransactions();
@@ -126,14 +102,7 @@ public class ProducerPerformance {
long transactionStartTime = 0;
for (long i = 0; i < numRecords; i++) {
- if (payloadFilePath != null) {
- payload =
payloadByteList.get(random.nextInt(payloadByteList.size()));
- } else if (recordSize != null) {
- for (int j = 0; j < payload.length; ++j)
- payload[j] = (byte) (random.nextInt(26) + 65);
- } else {
- throw new IllegalArgumentException("no payload File Path
or record Size provided");
- }
+ payload = generateRandomPayload(recordSize, payloadByteList,
payload, random);
if (transactionsEnabled && currentTransactionSize == 0) {
producer.beginTransaction();
@@ -190,8 +159,65 @@ public class ProducerPerformance {
}
+ KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
+ return new KafkaProducer<>(props);
+ }
+
+ static byte[] generateRandomPayload(Integer recordSize, List<byte[]>
payloadByteList, byte[] payload,
+ Random random) {
+ if (!payloadByteList.isEmpty()) {
+ payload =
payloadByteList.get(random.nextInt(payloadByteList.size()));
+ } else if (recordSize != null) {
+ for (int j = 0; j < payload.length; ++j)
+ payload[j] = (byte) (random.nextInt(26) + 65);
+ } else {
+ throw new IllegalArgumentException("no payload File Path or record
Size provided");
+ }
+ return payload;
+ }
+
+ static Properties readProps(List<String> producerProps, String
producerConfig, String transactionalId,
+ boolean transactionsEnabled) throws IOException {
+ Properties props = new Properties();
+ if (producerConfig != null) {
+ props.putAll(Utils.loadProps(producerConfig));
+ }
+ if (producerProps != null)
+ for (String prop : producerProps) {
+ String[] pieces = prop.split("=");
+ if (pieces.length != 2)
+ throw new IllegalArgumentException("Invalid property: " +
prop);
+ props.put(pieces[0], pieces[1]);
+ }
+
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
+ if (transactionsEnabled)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+ return props;
+ }
+
+ static List<byte[]> readPayloadFile(String payloadFilePath, String
payloadDelimiter) throws IOException {
+ List<byte[]> payloadByteList = new ArrayList<>();
+ if (payloadFilePath != null) {
+ Path path = Paths.get(payloadFilePath);
+ System.out.println("Reading payloads from: " +
path.toAbsolutePath());
+ if (Files.notExists(path) || Files.size(path) == 0) {
+ throw new IllegalArgumentException("File does not exist or
empty file provided.");
+ }
+
+ String[] payloadList = new String(Files.readAllBytes(path),
StandardCharsets.UTF_8).split(payloadDelimiter);
+
+ System.out.println("Number of messages read: " +
payloadList.length);
+
+ for (String payload : payloadList) {
+ payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+ return payloadByteList;
+ }
+
/** Get the command-line argument parser. */
- private static ArgumentParser argParser() {
+ static ArgumentParser argParser() {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("producer-performance")
.defaultHelp(true)
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
new file mode 100644
index 0000000..be037bd
--- /dev/null
+++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+public class ProducerPerformanceTest {
+
+ @Mock
+ KafkaProducer<byte[], byte[]> producerMock;
+
+ @Spy
+ ProducerPerformance producerPerformanceSpy;
+
+ private File createTempFile(String contents) throws IOException {
+ File file = File.createTempFile("ProducerPerformanceTest", ".tmp");
+ file.deleteOnExit();
+ Files.write(file.toPath(), contents.getBytes());
+ return file;
+ }
+
+ @Test
+ public void testReadPayloadFile() throws Exception {
+ File payloadFile = createTempFile("Hello\nKafka");
+ String payloadFilePath = payloadFile.getAbsolutePath();
+ String payloadDelimiter = "\n";
+
+ List<byte[]> payloadByteList =
ProducerPerformance.readPayloadFile(payloadFilePath, payloadDelimiter);
+
+ assertEquals(2, payloadByteList.size());
+ assertEquals("Hello", new String(payloadByteList.get(0)));
+ assertEquals("Kafka", new String(payloadByteList.get(1)));
+ }
+
+ @Test
+ public void testReadProps() throws Exception {
+ List<String> producerProps =
Collections.singletonList("bootstrap.servers=localhost:9000");
+ String producerConfig = createTempFile("acks=1").getAbsolutePath();
+ String transactionalId = "1234";
+ boolean transactionsEnabled = true;
+
+ Properties prop = ProducerPerformance.readProps(producerProps,
producerConfig, transactionalId, transactionsEnabled);
+
+ assertNotNull(prop);
+ assertEquals(5, prop.size());
+ }
+
+ @Test
+ public void testNumberOfCallsForSendAndClose() throws IOException {
+ doReturn(null).when(producerMock).send(any(), any());
+
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
+
+ String[] args = new String[] {
+ "--topic", "Hello-Kafka",
+ "--num-records", "5",
+ "--throughput", "100",
+ "--record-size", "100",
+ "--producer-props", "bootstrap.servers=localhost:9000"};
+ producerPerformanceSpy.start(args);
+ verify(producerMock, times(5)).send(any(), any());
+ verify(producerMock, times(1)).close();
+ }
+
+ @Test
+ public void testUnexpectedArg() {
+ String[] args = new String[] {
+ "--test", "test",
+ "--topic", "Hello-Kafka",
+ "--num-records", "5",
+ "--throughput", "100",
+ "--record-size", "100",
+ "--producer-props", "bootstrap.servers=localhost:9000"};
+ ArgumentParser parser = ProducerPerformance.argParser();
+ ArgumentParserException thrown =
assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args));
+ assertEquals("unrecognized arguments: '--test'", thrown.getMessage());
+ }
+
+ @Test
+ public void testGenerateRandomPayloadByPayloadFile() {
+ Integer recordSize = null;
+ String inputString = "Hello Kafka";
+ byte[] byteArray = inputString.getBytes(StandardCharsets.UTF_8);
+ List<byte[]> payloadByteList = new ArrayList<>();
+ payloadByteList.add(byteArray);
+ byte[] payload = null;
+ Random random = new Random(0);
+
+ payload = ProducerPerformance.generateRandomPayload(recordSize,
payloadByteList, payload, random);
+ assertEquals(inputString, new String(payload));
+ }
+
+ @Test
+ public void testGenerateRandomPayloadByRecordSize() {
+ Integer recordSize = 100;
+ byte[] payload = new byte[recordSize];
+ List<byte[]> payloadByteList = new ArrayList<>();
+ Random random = new Random(0);
+
+ payload = ProducerPerformance.generateRandomPayload(recordSize,
payloadByteList, payload, random);
+ for (byte b : payload) {
+ assertNotEquals(0, b);
+ }
+ }
+
+ @Test
+ public void testGenerateRandomPayloadException() {
+ Integer recordSize = null;
+ byte[] payload = null;
+ List<byte[]> payloadByteList = new ArrayList<>();
+ Random random = new Random(0);
+
+ IllegalArgumentException thrown =
assertThrows(IllegalArgumentException.class, () ->
ProducerPerformance.generateRandomPayload(recordSize, payloadByteList, payload,
random));
+ assertEquals("no payload File Path or record Size provided",
thrown.getMessage());
+ }
+}