This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new ca001b3be8f [cherry-pick][branch-2.10] Check numMessages after
incrementing counter and NPE cause (#18885)
ca001b3be8f is described below
commit ca001b3be8f151001e9641b04391c5f2221dd726
Author: Xiangying Meng <[email protected]>
AuthorDate: Tue Dec 13 11:47:19 2022 +0800
[cherry-pick][branch-2.10] Check numMessages after incrementing counter and
NPE cause (#18885)
Co-authored-by: Lei Zhiyuan <[email protected]>
---
.../pulsar/broker/service/BrokerService.java | 7 +-
.../pulsar/testclient/PerformanceReader.java | 11 ++-
.../pulsar/tests/integration/cli/PerfToolTest.java | 110 +++++++++++++++++++++
3 files changed, 122 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 7aad0880bd4..29070eb753c 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2072,7 +2072,12 @@ public class BrokerService implements Closeable {
}
Map<String, String> data = optMap.get();
data.forEach((configKey, value) -> {
- Field configField =
dynamicConfigurationMap.get(configKey).field;
+ ConfigField configFieldWrapper =
dynamicConfigurationMap.get(configKey);
+ if (configFieldWrapper == null) {
+ log.warn("{} does not exist in
dynamicConfigurationMap, skip this config.", configKey);
+ return;
+ }
+ Field configField = configFieldWrapper.field;
Object newValue =
FieldParser.value(data.get(configKey), configField);
if (configField != null) {
Consumer listener =
configRegisteredListeners.get(configKey);
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
index 31559536560..05b56d366d1 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java
@@ -235,17 +235,18 @@ public class PerformanceReader {
PerfClientUtils.exit(0);
}
}
- if (arguments.numMessages > 0 && totalMessagesReceived.sum() >=
arguments.numMessages) {
- log.info("------------- DONE (reached the maximum number: [{}]
of consumption) --------------",
- arguments.numMessages);
- PerfClientUtils.exit(0);
- }
messagesReceived.increment();
bytesReceived.add(msg.getData().length);
totalMessagesReceived.increment();
totalBytesReceived.add(msg.getData().length);
+ if (arguments.numMessages > 0 && totalMessagesReceived.sum() >=
arguments.numMessages) {
+ log.info("------------- DONE (reached the maximum number: [{}]
of consumption) --------------",
+ arguments.numMessages);
+ PerfClientUtils.exit(0);
+ }
+
if (limiter != null) {
limiter.acquire();
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
new file mode 100644
index 00000000000..ecc8dba8546
--- /dev/null
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/PerfToolTest.java
@@ -0,0 +1,110 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.cli;
+
+import static org.testng.Assert.fail;
+import org.apache.pulsar.tests.integration.containers.ChaosContainer;
+import org.apache.pulsar.tests.integration.containers.PulsarContainer;
+import org.apache.pulsar.tests.integration.containers.ZKContainer;
+import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
+import org.apache.pulsar.tests.integration.messaging.TopicMessagingBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+public class PerfToolTest extends TopicMessagingBase {
+
+ private static final int MESSAGE_COUNT = 50;
+
+ @Test
+ public void testProduce() throws Exception {
+ String serviceUrl = "pulsar://" +
pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
+ final String topicName = getNonPartitionedTopic("testProduce", true);
+ // Using the ZK container as it is separate from brokers, so its
environment resembles real world usage more
+ ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
+ ContainerExecResult produceResult =
produceWithPerfTool(clientToolContainer, serviceUrl, topicName, MESSAGE_COUNT);
+ checkOutputForLogs(produceResult,"PerformanceProducer - Aggregated
throughput stats",
+ "PerformanceProducer - Aggregated latency stats");
+ }
+
+ @Test
+ public void testConsume() throws Exception {
+ String serviceUrl = "pulsar://" +
pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
+ final String topicName = getNonPartitionedTopic("testConsume", true);
+ // Using the ZK container as it is separate from brokers, so its
environment resembles real world usage more
+ ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
+ ContainerExecResult consumeResult =
consumeWithPerfTool(clientToolContainer, serviceUrl, topicName);
+ checkOutputForLogs(consumeResult,"PerformanceConsumer - Aggregated
throughput stats",
+ "PerformanceConsumer - Aggregated latency stats");
+ }
+
+ @Test
+ public void testRead() throws Exception {
+ String serviceUrl = "pulsar://" +
pulsarCluster.getProxy().getContainerName() + ":" + PulsarContainer.BROKER_PORT;
+ final String topicName = getNonPartitionedTopic("testRead", true);
+ // Using the ZK container as it is separate from brokers, so its
environment resembles real world usage more
+ ZKContainer<?> clientToolContainer = pulsarCluster.getZooKeeper();
+ ContainerExecResult readResult = readWithPerfTool(clientToolContainer,
serviceUrl, topicName);
+ checkOutputForLogs(readResult,"PerformanceReader - Aggregated
throughput stats ",
+ "PerformanceReader - Aggregated latency stats");
+ }
+
+ private ContainerExecResult produceWithPerfTool(ChaosContainer<?>
container, String url, String topic, int messageCount) throws Exception {
+ ContainerExecResult result = container.execCmd("bin/pulsar-perf",
"produce", "-u", url, "-m", String.valueOf(messageCount), topic);
+
+ return failOnError("Performance producer", result);
+ }
+
+ private ContainerExecResult consumeWithPerfTool(ChaosContainer<?>
container, String url, String topic) throws Exception {
+ CompletableFuture<ContainerExecResult> resultFuture =
+ container.execCmdAsync("bin/pulsar-perf", "consume", "-u",
url, "-m", String.valueOf(MESSAGE_COUNT), topic);
+ produceWithPerfTool(container, url, topic, MESSAGE_COUNT);
+
+ ContainerExecResult result = resultFuture.get(5, TimeUnit.SECONDS);
+ return failOnError("Performance consumer", result);
+ }
+
+ private ContainerExecResult readWithPerfTool(ChaosContainer<?> container,
String url, String topic) throws Exception {
+ CompletableFuture<ContainerExecResult> resultFuture =
+ container.execCmdAsync("bin/pulsar-perf", "read", "-u", url,
"-n", String.valueOf(MESSAGE_COUNT), topic);
+ produceWithPerfTool(container, url, topic, MESSAGE_COUNT);
+
+ ContainerExecResult result = resultFuture.get(5, TimeUnit.SECONDS);
+ return failOnError("Performance consumer", result);
+ }
+
+ private static ContainerExecResult failOnError(String processDesc,
ContainerExecResult result) {
+ if (result.getExitCode() != 0) {
+ fail(processDesc + " failed. Command output:\n" +
result.getStdout()
+ + "\nError output:\n" + result.getStderr());
+ }
+ return result;
+ }
+
+ private static void checkOutputForLogs(ContainerExecResult result,
String... logs) {
+ String output = result.getStdout();
+ for (String log : logs) {
+ Assert.assertTrue(output.contains(log),
+ "command output did not contain log message '" + log +
"'.\nFull stdout is:\n" + output);
+ }
+ }
+
+}
\ No newline at end of file