This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 2a8706ceb28 [fix][cli] Check numMessages after incrementing counter
(#17826)
2a8706ceb28 is described below
commit 2a8706ceb28a0c12083d11188d840a9da7ddaf67
Author: Andras Beni <[email protected]>
AuthorDate: Thu Sep 29 11:07:22 2022 +0200
[fix][cli] Check numMessages after incrementing counter (#17826)
(cherry picked from commit df5e0e1869ff7ce55489e4a7853172fa37b2b59d)
---
.../pulsar/testclient/PerformanceReader.java | 11 +++----
.../pulsar/tests/integration/cli/PerfToolTest.java | 34 +++++++++++++++++-----
2 files changed, 32 insertions(+), 13 deletions(-)
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index d18c76a2f8b..506d366bb99 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -229,17 +229,18 @@ public class PerformanceReader {
PerfClientUtils.exit(0);
}
}
- if (arguments.numMessages > 0 && totalMessagesReceived.sum() >=
arguments.numMessages) {
- log.info("------------- DONE (reached the maximum number: [{}]
of consumption) --------------", arguments.numMessages);
- printAggregatedStats();
- PerfClientUtils.exit(0);
- }
messagesReceived.increment();
bytesReceived.add(msg.getData().length);
totalMessagesReceived.increment();
totalBytesReceived.add(msg.getData().length);
+ if (arguments.numMessages > 0 && totalMessagesReceived.sum() >=
arguments.numMessages) {
+ log.info("------------- DONE (reached the maximum number: [{}]
of consumption) --------------",
+ arguments.numMessages);
+ PerfClientUtils.exit(0);
+ }
+
if (limiter != null) {
limiter.acquire();
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
index 55af57d3b52..f87d11531dc 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
@@ -28,26 +28,24 @@ import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
public class PerfToolTest extends TopicMessagingBase {
private static final int MESSAGE_COUNT = 50;
@Test
- private void testProduce() throws Exception {
+ public void testProduce() throws Exception {
String serviceUrl = "pulsar://" +
pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
final String topicName = getNonPartitionedTopic("testProduce", true);
// Using the ZK container as it is separate from brokers, so its
environment resembles real world usage more
ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
- ContainerExecResult produceResult =
produceWithPerfTool(clientToolContainer, serviceUrl, topicName);
+ ContainerExecResult produceResult =
produceWithPerfTool(clientToolContainer, serviceUrl, topicName, MESSAGE_COUNT);
checkOutputForLogs(produceResult,"PerformanceProducer - Aggregated
throughput stats",
"PerformanceProducer - Aggregated latency stats");
}
@Test
- private void testConsume() throws Exception {
+ public void testConsume() throws Exception {
String serviceUrl = "pulsar://" +
pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
final String topicName = getNonPartitionedTopic("testConsume", true);
// Using the ZK container as it is separate from brokers, so its
environment resembles real world usage more
@@ -57,8 +55,19 @@ public class PerfToolTest extends TopicMessagingBase {
"PerformanceConsumer - Aggregated latency stats");
}
- private ContainerExecResult produceWithPerfTool(ChaosContainer<?>
container, String url, String topic) throws Exception {
- ContainerExecResult result = container.execCmd("bin/pulsar-perf",
"produce", "-u", url, "-m", String.valueOf(MESSAGE_COUNT), topic);
+ @Test
+ public void testRead() throws Exception {
+ String serviceUrl = "pulsar://" +
pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
+ final String topicName = getNonPartitionedTopic("testRead", true);
+ // Using the ZK container as it is separate from brokers, so its
environment resembles real world usage more
+ ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
+ ContainerExecResult readResult = readWithPerfTool(clientToolContainer,
serviceUrl, topicName);
+ checkOutputForLogs(readResult,"PerformanceReader - Aggregated
throughput stats ",
+ "PerformanceReader - Aggregated latency stats");
+ }
+
+ private ContainerExecResult produceWithPerfTool(ChaosContainer<?>
container, String url, String topic, int messageCount) throws Exception {
+ ContainerExecResult result = container.execCmd("bin/pulsar-perf",
"produce", "-u", url, "-m", String.valueOf(messageCount), topic);
return failOnError("Performance producer", result);
}
@@ -66,7 +75,16 @@ public class PerfToolTest extends TopicMessagingBase {
private ContainerExecResult consumeWithPerfTool(ChaosContainer<?>
container, String url, String topic) throws Exception {
CompletableFuture<ContainerExecResult> resultFuture =
container.execCmdAsync("bin/pulsar-perf", "consume", "-u",
url, "-m", String.valueOf(MESSAGE_COUNT), topic);
- produceWithPerfTool(container, url, topic);
+ produceWithPerfTool(container, url, topic, MESSAGE_COUNT);
+
+ ContainerExecResult result = resultFuture.get(5, TimeUnit.SECONDS);
+ return failOnError("Performance consumer", result);
+ }
+
+ private ContainerExecResult readWithPerfTool(ChaosContainer<?> container,
String url, String topic) throws Exception {
+ CompletableFuture<ContainerExecResult> resultFuture =
+ container.execCmdAsync("bin/pulsar-perf", "read", "-u", url,
"-n", String.valueOf(MESSAGE_COUNT), topic);
+ produceWithPerfTool(container, url, topic, MESSAGE_COUNT);
ContainerExecResult result = resultFuture.get(5, TimeUnit.SECONDS);
return failOnError("Performance consumer", result);