This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new ca001b3be8f [cherry-pick][branch-2.10] Check numMessages after 
incrementing counter and NPE cause (#18885)
ca001b3be8f is described below

commit ca001b3be8f151001e9641b04391c5f2221dd726
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Dec 13 11:47:19 2022 +0800

    [cherry-pick][branch-2.10] Check numMessages after incrementing counter and 
NPE cause (#18885)
    
    Co-authored-by: Lei Zhiyuan <[email protected]>
---
 .../pulsar/broker/service/BrokerService.java       |   7 +-
 .../pulsar/testclient/PerformanceReader.java       |  11 ++-
 .../pulsar/tests/integration/cli/PerfToolTest.java | 110 +++++++++++++++++++++
 3 files changed, 122 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7aad0880bd4..29070eb753c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2072,7 +2072,12 @@ public class BrokerService implements Closeable {
                     }
                     Map<String, String> data = optMap.get();
                     data.forEach((configKey, value) -> {
-                        Field configField = 
dynamicConfigurationMap.get(configKey).field;
+                        ConfigField configFieldWrapper = 
dynamicConfigurationMap.get(configKey);
+                        if (configFieldWrapper == null) {
+                            log.warn("{} does not exist in 
dynamicConfigurationMap, skip this config.", configKey);
+                            return;
+                        }
+                        Field configField = configFieldWrapper.field;
                         Object newValue = 
FieldParser.value(data.get(configKey), configField);
                         if (configField != null) {
                             Consumer listener = 
configRegisteredListeners.get(configKey);
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 31559536560..05b56d366d1 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -235,17 +235,18 @@ public class PerformanceReader {
                     PerfClientUtils.exit(0);
                 }
             }
-            if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= 
arguments.numMessages) {
-                log.info("------------- DONE (reached the maximum number: [{}] 
of consumption) --------------",
-                        arguments.numMessages);
-                PerfClientUtils.exit(0);
-            }
             messagesReceived.increment();
             bytesReceived.add(msg.getData().length);
 
             totalMessagesReceived.increment();
             totalBytesReceived.add(msg.getData().length);
 
+            if (arguments.numMessages > 0 && totalMessagesReceived.sum() >= 
arguments.numMessages) {
+                log.info("------------- DONE (reached the maximum number: [{}] 
of consumption) --------------",
+                        arguments.numMessages);
+                PerfClientUtils.exit(0);
+            }
+
             if (limiter != null) {
                 limiter.acquire();
             }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
new file mode 100644
index 00000000000..ecc8dba8546
--- /dev/null
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
@@ -0,0 +1,110 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.cli;
+
+import static org.testng.Assert.fail;
+import org.apache.pulsar.tests.integration.containers.ChaosContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.containers.ZKContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+public class PerfToolTest extends TopicMessagingBase {
+
+    private static final int MESSAGE_COUNT = 50;
+
+    @Test
+    public void testProduce() throws Exception {
+        String serviceUrl = "pulsar://" + 
pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
+        final String topicName = getNonPartitionedTopic("testProduce", true);
+        // Using the ZK container as it is separate from brokers, so its 
environment resembles real world usage more
+        ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
+        ContainerExecResult produceResult = 
produceWithPerfTool(clientToolContainer, serviceUrl, topicName, MESSAGE_COUNT);
+        checkOutputForLogs(produceResult,"PerformanceProducer - Aggregated 
throughput stats",
+                "PerformanceProducer - Aggregated latency stats");
+    }
+
+    @Test
+    public void testConsume() throws Exception {
+        String serviceUrl = "pulsar://" + 
pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
+        final String topicName = getNonPartitionedTopic("testConsume", true);
+        // Using the ZK container as it is separate from brokers, so its 
environment resembles real world usage more
+        ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
+        ContainerExecResult consumeResult = 
consumeWithPerfTool(clientToolContainer, serviceUrl, topicName);
+        checkOutputForLogs(consumeResult,"PerformanceConsumer - Aggregated 
throughput stats",
+                "PerformanceConsumer - Aggregated latency stats");
+    }
+
+    @Test
+    public void testRead() throws Exception {
+        String serviceUrl = "pulsar://" + 
pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
+        final String topicName = getNonPartitionedTopic("testRead", true);
+        // Using the ZK container as it is separate from brokers, so its 
environment resembles real world usage more
+        ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
+        ContainerExecResult readResult = readWithPerfTool(clientToolContainer, 
serviceUrl, topicName);
+        checkOutputForLogs(readResult,"PerformanceReader - Aggregated 
throughput stats ",
+                "PerformanceReader - Aggregated latency stats");
+    }
+
+    private ContainerExecResult produceWithPerfTool(ChaosContainer<?> 
container, String url, String topic, int messageCount) throws Exception {
+        ContainerExecResult result = container.execCmd("bin/pulsar-perf", 
"produce", "-u", url, "-m", String.valueOf(messageCount), topic);
+
+        return failOnError("Performance producer", result);
+    }
+
+    private ContainerExecResult consumeWithPerfTool(ChaosContainer<?> 
container, String url, String topic) throws Exception {
+        CompletableFuture<ContainerExecResult> resultFuture =
+                container.execCmdAsync("bin/pulsar-perf", "consume", "-u", 
url, "-m", String.valueOf(MESSAGE_COUNT), topic);
+        produceWithPerfTool(container, url, topic, MESSAGE_COUNT);
+
+        ContainerExecResult result = resultFuture.get(5, TimeUnit.SECONDS);
+        return failOnError("Performance consumer", result);
+    }
+
+    private ContainerExecResult readWithPerfTool(ChaosContainer<?> container, 
String url, String topic) throws Exception {
+        CompletableFuture<ContainerExecResult> resultFuture =
+                container.execCmdAsync("bin/pulsar-perf", "read", "-u", url, 
"-n", String.valueOf(MESSAGE_COUNT), topic);
+        produceWithPerfTool(container, url, topic, MESSAGE_COUNT);
+
+        ContainerExecResult result = resultFuture.get(5, TimeUnit.SECONDS);
+        return failOnError("Performance consumer", result);
+    }
+
+    private static ContainerExecResult failOnError(String processDesc, 
ContainerExecResult result) {
+        if (result.getExitCode() != 0) {
+            fail(processDesc + " failed. Command output:\n" + 
result.getStdout()
+                    + "\nError output:\n" + result.getStderr());
+        }
+        return result;
+    }
+
+    private static void checkOutputForLogs(ContainerExecResult result, 
String... logs) {
+        String output = result.getStdout();
+        for (String log : logs) {
+            Assert.assertTrue(output.contains(log),
+                    "command output did not contain log message '" + log + 
"'.\nFull stdout is:\n" + output);
+        }
+    }
+
+}
\ No newline at end of file

Reply via email to