This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aef004edeec KAFKA-14812:ProducerPerformance still counting successful
sending in console when sending failed (#13404)
aef004edeec is described below
commit aef004edeec36ac958dbb5499522115b7e99803b
Author: hudeqi <[email protected]>
AuthorDate: Tue Mar 21 16:59:18 2023 +0800
KAFKA-14812:ProducerPerformance still counting successful sending in
console when sending failed (#13404)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../apache/kafka/tools/ProducerPerformance.java | 47 ++++++++++----
.../kafka/tools/ProducerPerformanceTest.java | 73 ++++++++++++++++++++++
2 files changed, 107 insertions(+), 13 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index 6774f235114..d8a0f260691 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -94,7 +94,7 @@ public class ProducerPerformance {
}
Random random = new Random(0);
ProducerRecord<byte[], byte[]> record;
- Stats stats = new Stats(numRecords, 5000);
+ stats = new Stats(numRecords, 5000);
long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new
ThroughputThrottler(throughput, startMs);
@@ -113,7 +113,7 @@ public class ProducerPerformance {
record = new ProducerRecord<>(topicName, payload);
long sendStartMs = System.currentTimeMillis();
- Callback cb = stats.nextCompletion(sendStartMs,
payload.length, stats);
+ cb = new PerfCallback(sendStartMs, payload.length, stats);
producer.send(record, cb);
currentTransactionSize++;
@@ -164,6 +164,10 @@ public class ProducerPerformance {
return new KafkaProducer<>(props);
}
+ Callback cb;
+
+ Stats stats;
+
static byte[] generateRandomPayload(Integer recordSize, List<byte[]>
payloadByteList, byte[] payload,
Random random) {
if (!payloadByteList.isEmpty()) {
@@ -363,7 +367,7 @@ public class ProducerPerformance {
this.reportingInterval = reportingInterval;
}
- public void record(long iter, int latency, int bytes, long time) {
+ public void record(int latency, int bytes, long time) {
this.count++;
this.bytes += bytes;
this.totalLatency += latency;
@@ -372,7 +376,7 @@ public class ProducerPerformance {
this.windowBytes += bytes;
this.windowTotalLatency += latency;
this.windowMaxLatency = Math.max(windowMaxLatency, latency);
- if (iter % this.sampling == 0) {
+ if (this.iteration % this.sampling == 0) {
this.latencies[index] = latency;
this.index++;
}
@@ -383,10 +387,24 @@ public class ProducerPerformance {
}
}
- public Callback nextCompletion(long start, int bytes, Stats stats) {
- Callback cb = new PerfCallback(this.iteration, start, bytes,
stats);
- this.iteration++;
- return cb;
+ public long totalCount() {
+ return this.count;
+ }
+
+ public long currentWindowCount() {
+ return this.windowCount;
+ }
+
+ public long iteration() {
+ return this.iteration;
+ }
+
+ public long bytes() {
+ return this.bytes;
+ }
+
+ public int index() {
+ return this.index;
}
public void printWindow() {
@@ -438,23 +456,26 @@ public class ProducerPerformance {
}
}
- private static final class PerfCallback implements Callback {
+ static final class PerfCallback implements Callback {
private final long start;
- private final long iteration;
private final int bytes;
private final Stats stats;
- public PerfCallback(long iter, long start, int bytes, Stats stats) {
+ public PerfCallback(long start, int bytes, Stats stats) {
this.start = start;
this.stats = stats;
- this.iteration = iter;
this.bytes = bytes;
}
public void onCompletion(RecordMetadata metadata, Exception exception)
{
long now = System.currentTimeMillis();
int latency = (int) (now - start);
- this.stats.record(iteration, latency, bytes, now);
+ // It will only be counted when the sending is successful,
otherwise the number of sent records may be
+ // magically printed when the sending fails.
+ if (exception == null) {
+ this.stats.record(latency, bytes, now);
+ this.stats.iteration++;
+ }
if (exception != null)
exception.printStackTrace();
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
index b8adf5d8eb2..70717938845 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
@@ -16,7 +16,9 @@
*/
package org.apache.kafka.tools;
+import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.errors.AuthorizationException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@@ -35,13 +37,19 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -104,6 +112,49 @@ public class ProducerPerformanceTest {
verify(producerMock, times(1)).close();
}
+ @Test
+ public void testNumberOfSuccessfulSendAndClose() throws IOException {
+
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
+ doAnswer(invocation -> {
+ producerPerformanceSpy.cb.onCompletion(null, null);
+ return null;
+ }).when(producerMock).send(any(), any());
+
+ String[] args = new String[] {
+ "--topic", "Hello-Kafka",
+ "--num-records", "10",
+ "--throughput", "1",
+ "--record-size", "100",
+ "--producer-props", "bootstrap.servers=localhost:9000"};
+ producerPerformanceSpy.start(args);
+
+ verify(producerMock, times(10)).send(any(), any());
+ assertEquals(10, producerPerformanceSpy.stats.totalCount());
+ verify(producerMock, times(1)).close();
+ }
+
+ @Test
+ public void testNumberOfFailedSendAndClose() throws IOException {
+
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
+ doAnswer(invocation -> {
+ producerPerformanceSpy.cb.onCompletion(null, new
AuthorizationException("not authorized."));
+ return null;
+ }).when(producerMock).send(any(), any());
+
+ String[] args = new String[] {
+ "--topic", "Hello-Kafka",
+ "--num-records", "10",
+ "--throughput", "1",
+ "--record-size", "100",
+ "--producer-props", "bootstrap.servers=localhost:9000"};
+ producerPerformanceSpy.start(args);
+
+ verify(producerMock, times(10)).send(any(), any());
+ assertEquals(0, producerPerformanceSpy.stats.currentWindowCount());
+ assertEquals(0, producerPerformanceSpy.stats.totalCount());
+ verify(producerMock, times(1)).close();
+ }
+
@Test
public void testUnexpectedArg() {
String[] args = new String[] {
@@ -181,4 +232,26 @@ public class ProducerPerformanceTest {
long numRecords = Long.MAX_VALUE;
assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords,
5000));
}
+
+ @Test
+ public void testStatsCorrectness() throws Exception {
+ ExecutorService singleThreaded = Executors.newSingleThreadExecutor();
+ final long numRecords = 1000000;
+ ProducerPerformance.Stats stats = new
ProducerPerformance.Stats(numRecords, 5000);
+ for (long i = 0; i < numRecords; i++) {
+ final Callback callback = new ProducerPerformance.PerfCallback(0,
100, stats);
+ CompletableFuture.runAsync(() -> {
+ callback.onCompletion(null, null);
+ }, singleThreaded);
+ }
+
+ singleThreaded.shutdown();
+ final boolean success = singleThreaded.awaitTermination(60,
TimeUnit.SECONDS);
+
+ assertTrue(success, "should have terminated");
+ assertEquals(numRecords, stats.totalCount());
+ assertEquals(numRecords, stats.iteration());
+ assertEquals(500000, stats.index());
+ assertEquals(1000000 * 100, stats.bytes());
+ }
}