This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e4630a9 Add support for deduplication testing (#10246)
e4630a9 is described below
commit e4630a950d40a9dceac02aeaabd8dcd819ac0428
Author: Kevin Wilson <[email protected]>
AuthorDate: Wed Apr 21 12:57:44 2021 -0600
Add support for deduplication testing (#10246)
* Add support for deduplication testing
* Move to fasterxml.jackson for json parsing in test case.
---
pulsar-testclient/pom.xml | 6 ++
.../pulsar/testclient/DefaultMessageFormatter.java | 115 +++++++++++++++++++++
.../pulsar/testclient/IMessageFormatter.java | 23 +++++
.../pulsar/testclient/PerformanceProducer.java | 53 +++++++++-
.../pulsar/testclient/PerformanceProducerTest.java | 13 +++
.../testclient/TestDefaultMessageFormatter.java | 74 +++++++++++++
6 files changed, 280 insertions(+), 4 deletions(-)
diff --git a/pulsar-testclient/pom.xml b/pulsar-testclient/pom.xml
index a244e15..026c1cc 100644
--- a/pulsar-testclient/pom.xml
+++ b/pulsar-testclient/pom.xml
@@ -95,5 +95,11 @@
<artifactId>HdrHistogram</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+
</dependencies>
</project>
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/DefaultMessageFormatter.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/DefaultMessageFormatter.java
new file mode 100644
index 0000000..a84fb2d
--- /dev/null
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/DefaultMessageFormatter.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+
+public class DefaultMessageFormatter implements IMessageFormatter {
+ Random r = new Random();
+
+
+ @Override
+ public byte[] formatMessage(String producerName, long msgId, byte[]
message) {
+ String sMessage = new String(message, StandardCharsets.UTF_8);
+ if (producerName != null && !producerName.isEmpty()) {
+ sMessage = sMessage.replaceAll("%p", producerName);
+ }
+ sMessage = sMessage.replaceAll("%i", String.valueOf(msgId));
+ sMessage = sMessage.replaceAll("%t",
String.valueOf(System.nanoTime()));
+
+ int idx = sMessage.indexOf("%");
+ while (idx > 0) {
+
+ float size = 0;
+ int i=1;
+ for (; idx+i < sMessage.length(); i++) {
+ char c = sMessage.charAt(idx + i);
+ if (Character.isDigit(c) && c != '.') {
+ continue;
+ }
+ if (c == '.' || c == '-') {
+ continue;
+ }
+ break;
+ }
+ if (i != 1) {
+ size = Float.valueOf(new
String(sMessage.substring(idx+1,idx+i)));
+ }
+
+ String sub = sMessage.substring(idx, idx+i+1);
+
+ if (sMessage.charAt(idx+i) == 'f') {
+ sMessage=sMessage.replaceFirst(sub, getFloatValue(size));
+ } else if (sMessage.charAt(idx+i) == 'l') {
+ sMessage=sMessage.replaceFirst(sub, getLongValue(size));
+ } else if (sMessage.charAt(idx+i) == 'd') {
+ sMessage = sMessage.replaceFirst(sub, getIntValue(size));
+ } else if (sMessage.charAt(idx+i) == 's') {
+ sMessage = sMessage.replaceFirst(sub, getStringValue(size));
+ }
+ idx = sMessage.indexOf("%", idx);
+ }
+ return sMessage.getBytes(StandardCharsets.UTF_8);
+ }
+
+ private float _getFloatValue(float size) {
+ float f = r.nextFloat();
+ int mag = (int) Math.abs(size);
+ f = f * (float) Math.pow(10, mag);
+ if (size < 0 && ((int) f) % 2 == 1) {
+ return f * -1;
+ }
+ return f;
+ }
+
+ private String getStringValue(float size) {
+ int s = (int) size;
+ if (size == 0) {
+ size = 20;
+ };
+ String result = "";
+ for(int i = 0; i < s; i++) {
+ result = result + (char) ((int) 'a' + (int) (r.nextFloat() * 26));
+ }
+ return result;
+ }
+
+ private String getFloatValue(float size) {
+ if (size == 0) {
+ return String.valueOf(r.nextFloat());
+ }
+ String format = "%" + String.valueOf(size) + "f";
+
+ return String.format(format, _getFloatValue(size));
+ }
+
+ private String getIntValue(float size) {
+ if (size == 0) {
+ return String.valueOf(r.nextInt());
+ }
+ return String.valueOf((int) _getFloatValue(size));
+ }
+ private String getLongValue(float size) {
+ if (size == 0) {
+ return String.valueOf(r.nextLong());
+ }
+ return String.valueOf((long) _getFloatValue(size));
+ }
+}
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/IMessageFormatter.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/IMessageFormatter.java
new file mode 100644
index 0000000..e80ae50
--- /dev/null
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/IMessageFormatter.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+public interface IMessageFormatter {
+ byte[] formatMessage(String producerName, long msg, byte[] message);
+}
\ No newline at end of file
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index ece6853..94aef5f 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -94,6 +94,8 @@ public class PerformanceProducer {
private static Recorder recorder = new
Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
private static Recorder cumulativeRecorder = new
Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
+ private static IMessageFormatter messageFormatter = null;
+
@Parameters(commandDescription = "Test pulsar producer performance.")
static class Arguments {
@@ -121,6 +123,15 @@ public class PerformanceProducer {
@Parameter(names = { "-n", "--num-producers" }, description = "Number
of producers (per topic)")
public int numProducers = 1;
+ @Parameter(names = {"--separator"}, description = "Separator between
the topic and topic number")
+ public String separator = "-";
+
+ @Parameter(names = {"--send-timeout"}, description = "Set the
sendTimeout value default 0 to keep compatibility with previous version of
pulsar-perf")
+ public int sendTimeout = 0;
+
+ @Parameter(names = { "-pn", "--producer-name" }, description =
"Producer Name")
+ public String producerName = null;
+
@Parameter(names = { "-u", "--service-url" }, description = "Pulsar
Service URL")
public String serviceURL;
@@ -229,6 +240,13 @@ public class PerformanceProducer {
@Parameter(names = { "-am", "--access-mode" }, description = "Producer
access mode")
public ProducerAccessMode producerAccessMode =
ProducerAccessMode.Shared;
+
+ @Parameter(names = { "-fp", "--format-payload" },
+ description = "Format %i as a message index in the stream from
producer and/or %t as the timestamp nanoseconds.")
+ public boolean formatPayload = false;
+
+ @Parameter(names = {"-fc", "--format-class"}, description="Custom
Formatter class name")
+ public String formatterClass =
"org.apache.pulsar.testclient.DefaultMessageFormatter";
}
public static void main(String[] args) throws Exception {
@@ -256,7 +274,7 @@ public class PerformanceProducer {
String prefixTopicName = arguments.topics.get(0);
List<String> defaultTopics = Lists.newArrayList();
for (int i = 0; i < arguments.numTopics; i++) {
- defaultTopics.add(String.format("%s-%d", prefixTopicName,
i));
+ defaultTopics.add(String.format("%s%s%d", prefixTopicName,
arguments.separator, i));
}
arguments.topics = defaultTopics;
} else {
@@ -332,6 +350,10 @@ public class PerformanceProducer {
for (String payload : payloadList) {
payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
}
+
+ if (arguments.formatPayload) {
+ messageFormatter =
getMessageFormatter(arguments.formatterClass);
+ }
} else {
for (int i = 0; i < payloadBytes.length; ++i) {
payloadBytes[i] = (byte) (random.nextInt(26) + 65);
@@ -388,6 +410,7 @@ public class PerformanceProducer {
executor.submit(() -> {
log.info("Started performance test thread {}", threadIdx);
runProducer(
+ threadIdx,
arguments,
numMessagesPerThread,
msgRatePerThread,
@@ -453,7 +476,18 @@ public class PerformanceProducer {
}
}
- private static void runProducer(Arguments arguments,
+ static IMessageFormatter getMessageFormatter(String formatterClass) {
+ try {
+ ClassLoader classLoader =
PerformanceProducer.class.getClassLoader();
+ Class clz = classLoader.loadClass(formatterClass);
+ return (IMessageFormatter) clz.newInstance();
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ private static void runProducer(int producerId,
+ Arguments arguments,
long numMessages,
int msgRate,
List<byte[]> payloadByteList,
@@ -486,7 +520,7 @@ public class PerformanceProducer {
client = clientBuilder.build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer() //
- .sendTimeout(0, TimeUnit.SECONDS) //
+ .sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) //
.compressionType(arguments.compression) //
.maxPendingMessages(arguments.maxOutstanding) //
.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions)
@@ -494,6 +528,11 @@ public class PerformanceProducer {
// enable round robin message routing if it is a
partitioned topic
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+ if (arguments.producerName != null) {
+ String producerName = String.format("%s%s%d",
arguments.producerName, arguments.separator, producerId);
+ producerBuilder.producerName(producerName);
+ }
+
if (arguments.batchTimeMillis == 0.0 && arguments.batchMaxMessages
== 0) {
producerBuilder.enableBatching(false);
} else {
@@ -516,6 +555,7 @@ public class PerformanceProducer {
}
for (int i = 0; i < arguments.numTopics; i++) {
+
String topic = arguments.topics.get(i);
log.info("Adding {} publishers on topic {}",
arguments.numProducers, topic);
@@ -580,7 +620,12 @@ public class PerformanceProducer {
byte[] payloadData;
if (arguments.payloadFilename != null) {
- payloadData =
payloadByteList.get(random.nextInt(payloadByteList.size()));
+ if (messageFormatter != null) {
+ payloadData =
messageFormatter.formatMessage(arguments.producerName, totalSent,
+
payloadByteList.get(random.nextInt(payloadByteList.size())));
+ } else {
+ payloadData =
payloadByteList.get(random.nextInt(payloadByteList.size()));
+ }
} else {
payloadData = payloadBytes;
}
diff --git
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
index 716a05a..01588c6 100644
---
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
+++
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
@@ -176,4 +176,17 @@ public class PerformanceProducerTest extends
MockedPulsarServiceBaseTest {
thread.join();
Assert.assertEquals(10,
pulsar.getAdminClient().topics().getPartitionedTopicMetadata(topic).partitions);
}
+
+ @Test
+ public void testNotExistIMessageFormatter() {
+ IMessageFormatter msgFormatter =
PerformanceProducer.getMessageFormatter("org.apache.pulsar.testclient.NonExistentFormatter");
+ Assert.assertNull(msgFormatter);
+ }
+
+ @Test
+ public void testDefaultIMessageFormatter() {
+ IMessageFormatter msgFormatter =
PerformanceProducer.getMessageFormatter("org.apache.pulsar.testclient.DefaultMessageFormatter");
+ Assert.assertTrue(msgFormatter instanceof DefaultMessageFormatter);
+ }
+
}
diff --git
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
new file mode 100644
index 0000000..2d5c4b9
--- /dev/null
+++
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.nio.charset.StandardCharsets;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class TestDefaultMessageFormatter {
+
+ @Test
+ public void testFormatMessage() {
+ String producerName = "producer-1";
+ long msgId = 3;
+ byte[] message = "{ \"producer\": \"%p\", \"msgId\": %i, \"nanoTime\":
%t, \"float1\": %5.2f, \"float2\": %-5.2f, \"long1\": %12l, \"long2\": %l,
\"int1\": %d, \"int2\": %1d , \"long3\": %5l, \"str\": \"%5s\" }".getBytes();
+ byte[] formatted = new
DefaultMessageFormatter().formatMessage(producerName, msgId, message);
+ String jsonString = new String(formatted, StandardCharsets.UTF_8);
+
+ ObjectMapper objectMapper = new ObjectMapper();
+
+ JsonNode obj = null;
+ try {
+ obj = objectMapper.readValue(jsonString, JsonNode.class);
+
+ } catch(Exception jpe) {
+ Assert.fail("Exception parsing json");
+ }
+
+ String prod = obj.get("producer").asText();
+ int mid = obj.get("msgId").asInt();
+ long nt = obj.get("nanoTime").asLong();
+ float f1 = obj.get("float1").floatValue();
+ float f2 = obj.get("float2").floatValue();
+ long l1 = obj.get("long1").asLong();
+ long l2 = obj.get("long2").asLong();
+ long i1 = obj.get("int1").asInt();
+ long i2 = obj.get("int2").asInt();
+ String str = obj.get("str").asText();
+ long l3 = obj.get("long3").asLong();
+ Assert.assertEquals(producerName, prod);
+ Assert.assertEquals(msgId, mid);
+ Assert.assertTrue( nt > 0);
+ Assert.assertNotEquals(f1, f2);
+ Assert.assertNotEquals(l1, l2);
+ Assert.assertNotEquals(i1, i2);
+ Assert.assertTrue(l3 > 0);
+ Assert.assertTrue(l3 <= 99999);
+ Assert.assertTrue(i2 < 10);
+ Assert.assertTrue(0 < i2);
+ Assert.assertTrue(f2 < 100000);
+ Assert.assertTrue( -100000 < f2);
+
+ }
+}