This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch revert-13348-hdq_fix
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 46961cd3eb5094a76d73360856f9666c53eced35
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Thu Mar 16 15:49:13 2023 +0800

    Revert "MINOR: Fixed ProducerPerformance still counting successful sending 
when sending failed (#13348)"
    
    This reverts commit 8e4c0d0b04580cde1e15e7b4793bae747d38999c.
---
 .../apache/kafka/tools/ProducerPerformance.java    | 28 +++-----------
 .../kafka/tools/ProducerPerformanceTest.java       | 45 ----------------------
 2 files changed, 6 insertions(+), 67 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java 
b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index cf4f2b05514..6774f235114 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -94,7 +94,7 @@ public class ProducerPerformance {
             }
             Random random = new Random(0);
             ProducerRecord<byte[], byte[]> record;
-            stats = new Stats(numRecords, 5000);
+            Stats stats = new Stats(numRecords, 5000);
             long startMs = System.currentTimeMillis();
 
             ThroughputThrottler throttler = new 
ThroughputThrottler(throughput, startMs);
@@ -113,7 +113,7 @@ public class ProducerPerformance {
                 record = new ProducerRecord<>(topicName, payload);
 
                 long sendStartMs = System.currentTimeMillis();
-                cb = stats.nextCompletion(sendStartMs, payload.length);
+                Callback cb = stats.nextCompletion(sendStartMs, 
payload.length, stats);
                 producer.send(record, cb);
 
                 currentTransactionSize++;
@@ -164,10 +164,6 @@ public class ProducerPerformance {
         return new KafkaProducer<>(props);
     }
 
-    Callback cb;
-
-    Stats stats;
-
     static byte[] generateRandomPayload(Integer recordSize, List<byte[]> 
payloadByteList, byte[] payload,
             Random random) {
         if (!payloadByteList.isEmpty()) {
@@ -387,16 +383,9 @@ public class ProducerPerformance {
             }
         }
 
-        public long totalCount() {
-            return this.count;
-        }
-
-        public long currentWindowCount() {
-            return this.windowCount;
-        }
-
-        public Callback nextCompletion(long start, int bytes) {
-            Callback cb = new PerfCallback(this.iteration, start, bytes, this);
+        public Callback nextCompletion(long start, int bytes, Stats stats) {
+            Callback cb = new PerfCallback(this.iteration, start, bytes, 
stats);
+            this.iteration++;
             return cb;
         }
 
@@ -465,12 +454,7 @@ public class ProducerPerformance {
         public void onCompletion(RecordMetadata metadata, Exception exception) 
{
             long now = System.currentTimeMillis();
             int latency = (int) (now - start);
-            // It will only be counted when the sending is successful, 
otherwise the number of sent records may be
-            // magically printed when the sending fails.
-            if (exception == null) {
-                this.stats.record(iteration, latency, bytes, now);
-                this.stats.iteration++;
-            }
+            this.stats.record(iteration, latency, bytes, now);
             if (exception != null)
                 exception.printStackTrace();
         }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java 
b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
index aa97e269bb5..b8adf5d8eb2 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.tools;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.errors.AuthorizationException;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
@@ -43,7 +42,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -106,49 +104,6 @@ public class ProducerPerformanceTest {
         verify(producerMock, times(1)).close();
     }
 
-    @Test
-    public void testNumberOfSuccessfulSendAndClose() throws IOException {
-        
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
-        doAnswer(invocation -> {
-            producerPerformanceSpy.cb.onCompletion(null, null);
-            return null;
-        }).when(producerMock).send(any(), any());
-
-        String[] args = new String[] {
-            "--topic", "Hello-Kafka",
-            "--num-records", "10",
-            "--throughput", "1",
-            "--record-size", "100",
-            "--producer-props", "bootstrap.servers=localhost:9000"};
-        producerPerformanceSpy.start(args);
-
-        verify(producerMock, times(10)).send(any(), any());
-        assertEquals(10, producerPerformanceSpy.stats.totalCount());
-        verify(producerMock, times(1)).close();
-    }
-
-    @Test
-    public void testNumberOfFailedSendAndClose() throws IOException {
-        
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
-        doAnswer(invocation -> {
-            producerPerformanceSpy.cb.onCompletion(null, new 
AuthorizationException("not authorized."));
-            return null;
-        }).when(producerMock).send(any(), any());
-
-        String[] args = new String[] {
-            "--topic", "Hello-Kafka",
-            "--num-records", "10",
-            "--throughput", "1",
-            "--record-size", "100",
-            "--producer-props", "bootstrap.servers=localhost:9000"};
-        producerPerformanceSpy.start(args);
-
-        verify(producerMock, times(10)).send(any(), any());
-        assertEquals(0, producerPerformanceSpy.stats.currentWindowCount());
-        assertEquals(0, producerPerformanceSpy.stats.totalCount());
-        verify(producerMock, times(1)).close();
-    }
-
     @Test
     public void testUnexpectedArg() {
         String[] args = new String[] {

Reply via email to