This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 2a8706ceb28 [fix][cli] Check numMessages after incrementing counter 
(#17826)
2a8706ceb28 is described below

commit 2a8706ceb28a0c12083d11188d840a9da7ddaf67
Author: Andras Beni <[email protected]>
AuthorDate: Thu Sep 29 11:07:22 2022 +0200

    [fix][cli] Check numMessages after incrementing counter (#17826)
    
    (cherry picked from commit df5e0e1869ff7ce55489e4a7853172fa37b2b59d)
---
 .../pulsar/testclient/PerformanceReader.java       | 11 +++----
 .../pulsar/tests/integration/cli/PerfToolTest.java | 34 +++++++++++++++++-----
 2 files changed, 32 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index d18c76a2f8b..506d366bb99 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -229,17 +229,18 @@ public class PerformanceReader {
                     PerfClientUtils.exit(0);
                 }
             }
-            if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= 
arguments.numMessages) {
-                log.info("------------- DONE (reached the maximum number: [{}] 
of consumption) --------------", arguments.numMessages);
-                printAggregatedStats();
-                PerfClientUtils.exit(0);
-            }
             messagesReceived.increment();
             bytesReceived.add(msg.getData().length);
 
             totalMessagesReceived.increment();
             totalBytesReceived.add(msg.getData().length);
 
+            if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= 
arguments.numMessages) {
+                log.info("------------- DONE (reached the maximum number: [{}] 
of consumption) --------------",
+                        arguments.numMessages);
+                PerfClientUtils.exit(0);
+            }
+
             if (limiter != null) {
                 limiter.acquire();
             }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
index 55af57d3b52..f87d11531dc 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
@@ -28,26 +28,24 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 public class PerfToolTest extends TopicMessagingBase {
 
     private static final int MESSAGE_COUNT = 50;
 
     @Test
-    private void testProduce() throws Exception {
+    public void testProduce() throws Exception {
         String serviceUrl = "pulsar://" + 
pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
         final String topicName = getNonPartitionedTopic("testProduce", true);
         // Using the ZK container as it is separate from brokers, so its 
environment resembles real world usage more
         ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
-        ContainerExecResult produceResult = 
produceWithPerfTool(clientToolContainer, serviceUrl, topicName);
+        ContainerExecResult produceResult = 
produceWithPerfTool(clientToolContainer, serviceUrl, topicName, MESSAGE_COUNT);
         checkOutputForLogs(produceResult,"PerformanceProducer - Aggregated 
throughput stats",
                 "PerformanceProducer - Aggregated latency stats");
     }
 
     @Test
-    private void testConsume() throws Exception {
+    public void testConsume() throws Exception {
         String serviceUrl = "pulsar://" + 
pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
         final String topicName = getNonPartitionedTopic("testConsume", true);
         // Using the ZK container as it is separate from brokers, so its 
environment resembles real world usage more
@@ -57,8 +55,19 @@ public class PerfToolTest extends TopicMessagingBase {
                 "PerformanceConsumer - Aggregated latency stats");
     }
 
-    private ContainerExecResult produceWithPerfTool(ChaosContainer<?> 
container, String url, String topic) throws Exception {
-        ContainerExecResult result = container.execCmd("bin/pulsar-perf", 
"produce", "-u", url, "-m", String.valueOf(MESSAGE_COUNT), topic);
+    @Test
+    public void testRead() throws Exception {
+        String serviceUrl = "pulsar://" + 
pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
+        final String topicName = getNonPartitionedTopic("testRead", true);
+        // Using the ZK container as it is separate from brokers, so its 
environment resembles real world usage more
+        ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
+        ContainerExecResult readResult = readWithPerfTool(clientToolContainer, 
serviceUrl, topicName);
+        checkOutputForLogs(readResult,"PerformanceReader - Aggregated 
throughput stats ",
+                "PerformanceReader - Aggregated latency stats");
+    }
+
+    private ContainerExecResult produceWithPerfTool(ChaosContainer<?> 
container, String url, String topic, int messageCount) throws Exception {
+        ContainerExecResult result = container.execCmd("bin/pulsar-perf", 
"produce", "-u", url, "-m", String.valueOf(messageCount), topic);
 
         return failOnError("Performance producer", result);
     }
@@ -66,7 +75,16 @@ public class PerfToolTest extends TopicMessagingBase {
     private ContainerExecResult consumeWithPerfTool(ChaosContainer<?> 
container, String url, String topic) throws Exception {
         CompletableFuture<ContainerExecResult> resultFuture =
                 container.execCmdAsync("bin/pulsar-perf", "consume", "-u", 
url, "-m", String.valueOf(MESSAGE_COUNT), topic);
-        produceWithPerfTool(container, url, topic);
+        produceWithPerfTool(container, url, topic, MESSAGE_COUNT);
+
+        ContainerExecResult result = resultFuture.get(5, TimeUnit.SECONDS);
+        return failOnError("Performance consumer", result);
+    }
+
+    private ContainerExecResult readWithPerfTool(ChaosContainer<?> container, 
String url, String topic) throws Exception {
+        CompletableFuture<ContainerExecResult> resultFuture =
+                container.execCmdAsync("bin/pulsar-perf", "read", "-u", url, 
"-n", String.valueOf(MESSAGE_COUNT), topic);
+        produceWithPerfTool(container, url, topic, MESSAGE_COUNT);
 
         ContainerExecResult result = resultFuture.get(5, TimeUnit.SECONDS);
         return failOnError("Performance consumer", result);

Reply via email to