This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aef004edeec KAFKA-14812:ProducerPerformance still counting successful 
sending in console when sending failed (#13404)
aef004edeec is described below

commit aef004edeec36ac958dbb5499522115b7e99803b
Author: hudeqi <[email protected]>
AuthorDate: Tue Mar 21 16:59:18 2023 +0800

    KAFKA-14812:ProducerPerformance still counting successful sending in 
console when sending failed (#13404)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../apache/kafka/tools/ProducerPerformance.java    | 47 ++++++++++----
 .../kafka/tools/ProducerPerformanceTest.java       | 73 ++++++++++++++++++++++
 2 files changed, 107 insertions(+), 13 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index 6774f235114..d8a0f260691 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -94,7 +94,7 @@ public class ProducerPerformance {
             }
             Random random = new Random(0);
             ProducerRecord<byte[], byte[]> record;
-            Stats stats = new Stats(numRecords, 5000);
+            stats = new Stats(numRecords, 5000);
             long startMs = System.currentTimeMillis();
 
             ThroughputThrottler throttler = new 
ThroughputThrottler(throughput, startMs);
@@ -113,7 +113,7 @@ public class ProducerPerformance {
                 record = new ProducerRecord<>(topicName, payload);
 
                 long sendStartMs = System.currentTimeMillis();
-                Callback cb = stats.nextCompletion(sendStartMs, 
payload.length, stats);
+                cb = new PerfCallback(sendStartMs, payload.length, stats);
                 producer.send(record, cb);
 
                 currentTransactionSize++;
@@ -164,6 +164,10 @@ public class ProducerPerformance {
         return new KafkaProducer<>(props);
     }
 
+    Callback cb;
+
+    Stats stats;
+
     static byte[] generateRandomPayload(Integer recordSize, List<byte[]> 
payloadByteList, byte[] payload,
             Random random) {
         if (!payloadByteList.isEmpty()) {
@@ -363,7 +367,7 @@ public class ProducerPerformance {
             this.reportingInterval = reportingInterval;
         }
 
-        public void record(long iter, int latency, int bytes, long time) {
+        public void record(int latency, int bytes, long time) {
             this.count++;
             this.bytes += bytes;
             this.totalLatency += latency;
@@ -372,7 +376,7 @@ public class ProducerPerformance {
             this.windowBytes += bytes;
             this.windowTotalLatency += latency;
             this.windowMaxLatency = Math.max(windowMaxLatency, latency);
-            if (iter % this.sampling == 0) {
+            if (this.iteration % this.sampling == 0) {
                 this.latencies[index] = latency;
                 this.index++;
             }
@@ -383,10 +387,24 @@ public class ProducerPerformance {
             }
         }
 
-        public Callback nextCompletion(long start, int bytes, Stats stats) {
-            Callback cb = new PerfCallback(this.iteration, start, bytes, 
stats);
-            this.iteration++;
-            return cb;
+        public long totalCount() {
+            return this.count;
+        }
+
+        public long currentWindowCount() {
+            return this.windowCount;
+        }
+
+        public long iteration() {
+            return this.iteration;
+        }
+
+        public long bytes() {
+            return this.bytes;
+        }
+
+        public int index() {
+            return this.index;
         }
 
         public void printWindow() {
@@ -438,23 +456,26 @@ public class ProducerPerformance {
         }
     }
 
-    private static final class PerfCallback implements Callback {
+    static final class PerfCallback implements Callback {
         private final long start;
-        private final long iteration;
         private final int bytes;
         private final Stats stats;
 
-        public PerfCallback(long iter, long start, int bytes, Stats stats) {
+        public PerfCallback(long start, int bytes, Stats stats) {
             this.start = start;
             this.stats = stats;
-            this.iteration = iter;
             this.bytes = bytes;
         }
 
         public void onCompletion(RecordMetadata metadata, Exception exception) 
{
             long now = System.currentTimeMillis();
             int latency = (int) (now - start);
-            this.stats.record(iteration, latency, bytes, now);
+            // It will only be counted when the sending is successful, 
otherwise the number of sent records may be
+            // magically printed when the sending fails.
+            if (exception == null) {
+                this.stats.record(latency, bytes, now);
+                this.stats.iteration++;
+            }
             if (exception != null)
                 exception.printStackTrace();
         }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
index b8adf5d8eb2..70717938845 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.tools;
 
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.errors.AuthorizationException;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
@@ -35,13 +37,19 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -104,6 +112,49 @@ public class ProducerPerformanceTest {
         verify(producerMock, times(1)).close();
     }
 
+    @Test
+    public void testNumberOfSuccessfulSendAndClose() throws IOException {
+        
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
+        doAnswer(invocation -> {
+            producerPerformanceSpy.cb.onCompletion(null, null);
+            return null;
+        }).when(producerMock).send(any(), any());
+
+        String[] args = new String[] {
+            "--topic", "Hello-Kafka",
+            "--num-records", "10",
+            "--throughput", "1",
+            "--record-size", "100",
+            "--producer-props", "bootstrap.servers=localhost:9000"};
+        producerPerformanceSpy.start(args);
+
+        verify(producerMock, times(10)).send(any(), any());
+        assertEquals(10, producerPerformanceSpy.stats.totalCount());
+        verify(producerMock, times(1)).close();
+    }
+
+    @Test
+    public void testNumberOfFailedSendAndClose() throws IOException {
+        
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
+        doAnswer(invocation -> {
+            producerPerformanceSpy.cb.onCompletion(null, new 
AuthorizationException("not authorized."));
+            return null;
+        }).when(producerMock).send(any(), any());
+
+        String[] args = new String[] {
+            "--topic", "Hello-Kafka",
+            "--num-records", "10",
+            "--throughput", "1",
+            "--record-size", "100",
+            "--producer-props", "bootstrap.servers=localhost:9000"};
+        producerPerformanceSpy.start(args);
+
+        verify(producerMock, times(10)).send(any(), any());
+        assertEquals(0, producerPerformanceSpy.stats.currentWindowCount());
+        assertEquals(0, producerPerformanceSpy.stats.totalCount());
+        verify(producerMock, times(1)).close();
+    }
+
     @Test
     public void testUnexpectedArg() {
         String[] args = new String[] {
@@ -181,4 +232,26 @@ public class ProducerPerformanceTest {
         long numRecords = Long.MAX_VALUE;
         assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, 
5000));
     }
+
+    @Test
+    public void testStatsCorrectness() throws Exception {
+        ExecutorService singleThreaded = Executors.newSingleThreadExecutor();
+        final long numRecords = 1000000;
+        ProducerPerformance.Stats stats = new 
ProducerPerformance.Stats(numRecords, 5000);
+        for (long i = 0; i < numRecords; i++) {
+            final Callback callback = new ProducerPerformance.PerfCallback(0, 
100, stats);
+            CompletableFuture.runAsync(() -> {
+                callback.onCompletion(null, null);
+            }, singleThreaded);
+        }
+
+        singleThreaded.shutdown();
+        final boolean success = singleThreaded.awaitTermination(60, 
TimeUnit.SECONDS);
+
+        assertTrue(success, "should have terminated");
+        assertEquals(numRecords, stats.totalCount());
+        assertEquals(numRecords, stats.iteration());
+        assertEquals(500000, stats.index());
+        assertEquals(1000000 * 100, stats.bytes());
+    }
 }

Reply via email to