This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch revert-13348-hdq_fix in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 46961cd3eb5094a76d73360856f9666c53eced35 Author: Chia-Ping Tsai <[email protected]> AuthorDate: Thu Mar 16 15:49:13 2023 +0800 Revert "MINOR: Fixed ProducerPerformance still counting successful sending when sending failed (#13348)" This reverts commit 8e4c0d0b04580cde1e15e7b4793bae747d38999c. --- .../apache/kafka/tools/ProducerPerformance.java | 28 +++----------- .../kafka/tools/ProducerPerformanceTest.java | 45 ---------------------- 2 files changed, 6 insertions(+), 67 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index cf4f2b05514..6774f235114 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -94,7 +94,7 @@ public class ProducerPerformance { } Random random = new Random(0); ProducerRecord<byte[], byte[]> record; - stats = new Stats(numRecords, 5000); + Stats stats = new Stats(numRecords, 5000); long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs); @@ -113,7 +113,7 @@ public class ProducerPerformance { record = new ProducerRecord<>(topicName, payload); long sendStartMs = System.currentTimeMillis(); - cb = stats.nextCompletion(sendStartMs, payload.length); + Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats); producer.send(record, cb); currentTransactionSize++; @@ -164,10 +164,6 @@ public class ProducerPerformance { return new KafkaProducer<>(props); } - Callback cb; - - Stats stats; - static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload, Random random) { if (!payloadByteList.isEmpty()) { @@ -387,16 +383,9 @@ public class ProducerPerformance { } } - public long totalCount() { - return this.count; - } - - public long currentWindowCount() { - return this.windowCount; - } - - public Callback nextCompletion(long start, int bytes) { - Callback cb = new PerfCallback(this.iteration, start, bytes, this); + public Callback nextCompletion(long start, int bytes, Stats stats) { + Callback cb = new PerfCallback(this.iteration, start, bytes, stats); + this.iteration++; return cb; } @@ -465,12 +454,7 @@ public class ProducerPerformance { public void onCompletion(RecordMetadata metadata, Exception exception) { long now = System.currentTimeMillis(); int latency = (int) (now - start); - // It will only be counted when the sending is successful, otherwise the number of sent records may be - // magically printed when the sending fails. - if (exception == null) { - this.stats.record(iteration, latency, bytes, now); - this.stats.iteration++; - } + this.stats.record(iteration, latency, bytes, now); if (exception != null) exception.printStackTrace(); } diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index aa97e269bb5..b8adf5d8eb2 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -17,7 +17,6 @@ package org.apache.kafka.tools; import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.common.errors.AuthorizationException; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -43,7 +42,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -106,49 +104,6 @@ public class ProducerPerformanceTest { verify(producerMock, times(1)).close(); } - @Test - public void testNumberOfSuccessfulSendAndClose() throws IOException { - doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class)); - doAnswer(invocation -> { - producerPerformanceSpy.cb.onCompletion(null, null); - return null; - }).when(producerMock).send(any(), any()); - - String[] args = new String[] { - "--topic", "Hello-Kafka", - "--num-records", "10", - "--throughput", "1", - "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; - producerPerformanceSpy.start(args); - - verify(producerMock, times(10)).send(any(), any()); - assertEquals(10, producerPerformanceSpy.stats.totalCount()); - verify(producerMock, times(1)).close(); - } - - @Test - public void testNumberOfFailedSendAndClose() throws IOException { - doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class)); - doAnswer(invocation -> { - producerPerformanceSpy.cb.onCompletion(null, new AuthorizationException("not authorized.")); - return null; - }).when(producerMock).send(any(), any()); - - String[] args = new String[] { - "--topic", "Hello-Kafka", - "--num-records", "10", - "--throughput", "1", - "--record-size", "100", - "--producer-props", "bootstrap.servers=localhost:9000"}; - producerPerformanceSpy.start(args); - - verify(producerMock, times(10)).send(any(), any()); - assertEquals(0, producerPerformanceSpy.stats.currentWindowCount()); - assertEquals(0, producerPerformanceSpy.stats.totalCount()); - verify(producerMock, times(1)).close(); - } - @Test public void testUnexpectedArg() { String[] args = new String[] {
