This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 279c2376328 Revert "MINOR: Fixed ProducerPerformance still counting
successful sending when sending failed (#13348)" (#13401)
279c2376328 is described below
commit 279c237632878347f0a7bbb5f09a5f9924829419
Author: Chia-Ping Tsai <[email protected]>
AuthorDate: Thu Mar 16 21:26:01 2023 +0800
Revert "MINOR: Fixed ProducerPerformance still counting successful sending
when sending failed (#13348)" (#13401)
This reverts commit 8e4c0d0b04580cde1e15e7b4793bae747d38999c.
Reviewers: Luke Chen <[email protected]>
---
.../apache/kafka/tools/ProducerPerformance.java | 28 +++-----------
.../kafka/tools/ProducerPerformanceTest.java | 45 ----------------------
2 files changed, 6 insertions(+), 67 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
index cf4f2b05514..6774f235114 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
@@ -94,7 +94,7 @@ public class ProducerPerformance {
}
Random random = new Random(0);
ProducerRecord<byte[], byte[]> record;
- stats = new Stats(numRecords, 5000);
+ Stats stats = new Stats(numRecords, 5000);
long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new
ThroughputThrottler(throughput, startMs);
@@ -113,7 +113,7 @@ public class ProducerPerformance {
record = new ProducerRecord<>(topicName, payload);
long sendStartMs = System.currentTimeMillis();
- cb = stats.nextCompletion(sendStartMs, payload.length);
+ Callback cb = stats.nextCompletion(sendStartMs,
payload.length, stats);
producer.send(record, cb);
currentTransactionSize++;
@@ -164,10 +164,6 @@ public class ProducerPerformance {
return new KafkaProducer<>(props);
}
- Callback cb;
-
- Stats stats;
-
static byte[] generateRandomPayload(Integer recordSize, List<byte[]>
payloadByteList, byte[] payload,
Random random) {
if (!payloadByteList.isEmpty()) {
@@ -387,16 +383,9 @@ public class ProducerPerformance {
}
}
- public long totalCount() {
- return this.count;
- }
-
- public long currentWindowCount() {
- return this.windowCount;
- }
-
- public Callback nextCompletion(long start, int bytes) {
- Callback cb = new PerfCallback(this.iteration, start, bytes, this);
+ public Callback nextCompletion(long start, int bytes, Stats stats) {
+ Callback cb = new PerfCallback(this.iteration, start, bytes,
stats);
+ this.iteration++;
return cb;
}
@@ -465,12 +454,7 @@ public class ProducerPerformance {
public void onCompletion(RecordMetadata metadata, Exception exception)
{
long now = System.currentTimeMillis();
int latency = (int) (now - start);
- // It will only be counted when the sending is successful,
otherwise the number of sent records may be
- // magically printed when the sending fails.
- if (exception == null) {
- this.stats.record(iteration, latency, bytes, now);
- this.stats.iteration++;
- }
+ this.stats.record(iteration, latency, bytes, now);
if (exception != null)
exception.printStackTrace();
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
index aa97e269bb5..b8adf5d8eb2 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java
@@ -17,7 +17,6 @@
package org.apache.kafka.tools;
import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.common.errors.AuthorizationException;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@@ -43,7 +42,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -106,49 +104,6 @@ public class ProducerPerformanceTest {
verify(producerMock, times(1)).close();
}
- @Test
- public void testNumberOfSuccessfulSendAndClose() throws IOException {
-
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
- doAnswer(invocation -> {
- producerPerformanceSpy.cb.onCompletion(null, null);
- return null;
- }).when(producerMock).send(any(), any());
-
- String[] args = new String[] {
- "--topic", "Hello-Kafka",
- "--num-records", "10",
- "--throughput", "1",
- "--record-size", "100",
- "--producer-props", "bootstrap.servers=localhost:9000"};
- producerPerformanceSpy.start(args);
-
- verify(producerMock, times(10)).send(any(), any());
- assertEquals(10, producerPerformanceSpy.stats.totalCount());
- verify(producerMock, times(1)).close();
- }
-
- @Test
- public void testNumberOfFailedSendAndClose() throws IOException {
-
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
- doAnswer(invocation -> {
- producerPerformanceSpy.cb.onCompletion(null, new
AuthorizationException("not authorized."));
- return null;
- }).when(producerMock).send(any(), any());
-
- String[] args = new String[] {
- "--topic", "Hello-Kafka",
- "--num-records", "10",
- "--throughput", "1",
- "--record-size", "100",
- "--producer-props", "bootstrap.servers=localhost:9000"};
- producerPerformanceSpy.start(args);
-
- verify(producerMock, times(10)).send(any(), any());
- assertEquals(0, producerPerformanceSpy.stats.currentWindowCount());
- assertEquals(0, producerPerformanceSpy.stats.totalCount());
- verify(producerMock, times(1)).close();
- }
-
@Test
public void testUnexpectedArg() {
String[] args = new String[] {