This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e4630a9  Add support for deduplication testing (#10246)
e4630a9 is described below

commit e4630a950d40a9dceac02aeaabd8dcd819ac0428
Author: Kevin Wilson <[email protected]>
AuthorDate: Wed Apr 21 12:57:44 2021 -0600

    Add support for deduplication testing (#10246)
    
    * Add support for deduplication testing
    
    * Move to fasterxml.jackson for json parsing in test case.
---
 pulsar-testclient/pom.xml                          |   6 ++
 .../pulsar/testclient/DefaultMessageFormatter.java | 115 +++++++++++++++++++++
 .../pulsar/testclient/IMessageFormatter.java       |  23 +++++
 .../pulsar/testclient/PerformanceProducer.java     |  53 +++++++++-
 .../pulsar/testclient/PerformanceProducerTest.java |  13 +++
 .../testclient/TestDefaultMessageFormatter.java    |  74 +++++++++++++
 6 files changed, 280 insertions(+), 4 deletions(-)

diff --git a/pulsar-testclient/pom.xml b/pulsar-testclient/pom.xml
index a244e15..026c1cc 100644
--- a/pulsar-testclient/pom.xml
+++ b/pulsar-testclient/pom.xml
@@ -95,5 +95,11 @@
                        <artifactId>HdrHistogram</artifactId>
                </dependency>
 
+               <dependency>
+                       <groupId>com.fasterxml.jackson.core</groupId>
+                       <artifactId>jackson-databind</artifactId>
+               </dependency>
+
+
        </dependencies>
 </project>
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/DefaultMessageFormatter.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/DefaultMessageFormatter.java
new file mode 100644
index 0000000..a84fb2d
--- /dev/null
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/DefaultMessageFormatter.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+
+public class DefaultMessageFormatter implements IMessageFormatter {
+    Random r  = new Random();
+
+
+    @Override
+    public byte[] formatMessage(String producerName, long msgId, byte[] 
message) {
+        String sMessage = new String(message, StandardCharsets.UTF_8);
+        if (producerName != null && !producerName.isEmpty()) {
+            sMessage = sMessage.replaceAll("%p", producerName);
+        }
+        sMessage = sMessage.replaceAll("%i", String.valueOf(msgId));
+        sMessage = sMessage.replaceAll("%t", 
String.valueOf(System.nanoTime()));
+
+        int idx = sMessage.indexOf("%");
+        while (idx > 0) {
+
+            float size = 0;
+            int i=1;
+            for (; idx+i < sMessage.length(); i++) {
+                char c = sMessage.charAt(idx + i);
+                if (Character.isDigit(c) && c != '.') {
+                    continue;
+                }
+                if (c == '.' || c == '-') {
+                    continue;
+                }
+                break;
+            }
+            if (i != 1) {
+                size = Float.valueOf(new 
String(sMessage.substring(idx+1,idx+i)));
+            }
+
+            String sub = sMessage.substring(idx, idx+i+1);
+
+            if (sMessage.charAt(idx+i) == 'f') {
+                sMessage=sMessage.replaceFirst(sub, getFloatValue(size));
+            } else if (sMessage.charAt(idx+i) == 'l') {
+                sMessage=sMessage.replaceFirst(sub, getLongValue(size));
+            } else if (sMessage.charAt(idx+i) == 'd') {
+                sMessage = sMessage.replaceFirst(sub, getIntValue(size));
+            } else if (sMessage.charAt(idx+i) == 's') {
+                sMessage = sMessage.replaceFirst(sub, getStringValue(size));
+            }
+            idx = sMessage.indexOf("%", idx);
+        }
+        return sMessage.getBytes(StandardCharsets.UTF_8);
+    }
+
+    private float _getFloatValue(float size) {
+        float f = r.nextFloat();
+        int mag = (int) Math.abs(size);
+        f = f * (float) Math.pow(10, mag);
+        if (size < 0 && ((int) f) % 2 == 1) {
+            return f * -1;
+        }
+        return f;
+    }
+
+    private String getStringValue(float size) {
+        int s = (int) size;
+        if (size == 0) {
+            size = 20;
+        };
+        String result = "";
+        for(int i = 0; i < s; i++) {
+            result = result + (char) ((int) 'a' + (int) (r.nextFloat() * 26));
+        }
+        return result;
+    }
+
+    private String getFloatValue(float size) {
+        if (size == 0) {
+            return String.valueOf(r.nextFloat());
+        }
+        String format = "%" + String.valueOf(size) + "f";
+
+        return String.format(format, _getFloatValue(size));
+    }
+
+    private String getIntValue(float size) {
+        if (size == 0) {
+            return String.valueOf(r.nextInt());
+        }
+        return String.valueOf((int) _getFloatValue(size));
+    }
+    private String getLongValue(float size) {
+        if (size == 0) {
+            return String.valueOf(r.nextLong());
+        }
+        return String.valueOf((long) _getFloatValue(size));
+    }
+}
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/IMessageFormatter.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/IMessageFormatter.java
new file mode 100644
index 0000000..e80ae50
--- /dev/null
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/IMessageFormatter.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+public interface IMessageFormatter {
+    byte[] formatMessage(String producerName, long msg, byte[] message);
+}
\ No newline at end of file
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index ece6853..94aef5f 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -94,6 +94,8 @@ public class PerformanceProducer {
     private static Recorder recorder = new 
Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
     private static Recorder cumulativeRecorder = new 
Recorder(TimeUnit.SECONDS.toMicros(120000), 5);
 
+    private static IMessageFormatter messageFormatter = null;
+
     @Parameters(commandDescription = "Test pulsar producer performance.")
     static class Arguments {
 
@@ -121,6 +123,15 @@ public class PerformanceProducer {
         @Parameter(names = { "-n", "--num-producers" }, description = "Number 
of producers (per topic)")
         public int numProducers = 1;
 
+        @Parameter(names = {"--separator"}, description = "Separator between 
the topic and topic number")
+        public String separator = "-";
+
+        @Parameter(names = {"--send-timeout"}, description = "Set the 
sendTimeout value default 0 to keep compatibility with previous version of 
pulsar-perf")
+        public int sendTimeout = 0;
+
+        @Parameter(names = { "-pn", "--producer-name" }, description = 
"Producer Name")
+        public String producerName = null;
+
         @Parameter(names = { "-u", "--service-url" }, description = "Pulsar 
Service URL")
         public String serviceURL;
 
@@ -229,6 +240,13 @@ public class PerformanceProducer {
 
         @Parameter(names = { "-am", "--access-mode" }, description = "Producer 
access mode")
         public ProducerAccessMode producerAccessMode = 
ProducerAccessMode.Shared;
+
+        @Parameter(names = { "-fp", "--format-payload" },
+                description = "Format %i as a message index in the stream from 
producer and/or %t as the timestamp nanoseconds.")
+        public boolean formatPayload = false;
+
+        @Parameter(names = {"-fc", "--format-class"}, description="Custom 
Formatter class name")
+        public String formatterClass = 
"org.apache.pulsar.testclient.DefaultMessageFormatter";
     }
 
     public static void main(String[] args) throws Exception {
@@ -256,7 +274,7 @@ public class PerformanceProducer {
                 String prefixTopicName = arguments.topics.get(0);
                 List<String> defaultTopics = Lists.newArrayList();
                 for (int i = 0; i < arguments.numTopics; i++) {
-                    defaultTopics.add(String.format("%s-%d", prefixTopicName, 
i));
+                    defaultTopics.add(String.format("%s%s%d", prefixTopicName, 
arguments.separator, i));
                 }
                 arguments.topics = defaultTopics;
             } else {
@@ -332,6 +350,10 @@ public class PerformanceProducer {
             for (String payload : payloadList) {
                 payloadByteList.add(payload.getBytes(StandardCharsets.UTF_8));
             }
+
+            if (arguments.formatPayload) {
+                messageFormatter = 
getMessageFormatter(arguments.formatterClass);
+            }
         } else {
             for (int i = 0; i < payloadBytes.length; ++i) {
                 payloadBytes[i] = (byte) (random.nextInt(26) + 65);
@@ -388,6 +410,7 @@ public class PerformanceProducer {
             executor.submit(() -> {
                 log.info("Started performance test thread {}", threadIdx);
                 runProducer(
+                    threadIdx,
                     arguments,
                     numMessagesPerThread,
                     msgRatePerThread,
@@ -453,7 +476,18 @@ public class PerformanceProducer {
         }
     }
 
-    private static void runProducer(Arguments arguments,
+    static IMessageFormatter getMessageFormatter(String formatterClass) {
+        try {
+            ClassLoader classLoader = 
PerformanceProducer.class.getClassLoader();
+            Class clz = classLoader.loadClass(formatterClass);
+            return (IMessageFormatter) clz.newInstance();
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
+    private static void runProducer(int producerId,
+                                    Arguments arguments,
                                     long numMessages,
                                     int msgRate,
                                     List<byte[]> payloadByteList,
@@ -486,7 +520,7 @@ public class PerformanceProducer {
 
             client = clientBuilder.build();
             ProducerBuilder<byte[]> producerBuilder = client.newProducer() //
-                    .sendTimeout(0, TimeUnit.SECONDS) //
+                    .sendTimeout(arguments.sendTimeout, TimeUnit.SECONDS) //
                     .compressionType(arguments.compression) //
                     .maxPendingMessages(arguments.maxOutstanding) //
                     
.maxPendingMessagesAcrossPartitions(arguments.maxPendingMessagesAcrossPartitions)
@@ -494,6 +528,11 @@ public class PerformanceProducer {
                     // enable round robin message routing if it is a 
partitioned topic
                     
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition);
 
+            if (arguments.producerName != null) {
+                String producerName = String.format("%s%s%d", 
arguments.producerName, arguments.separator, producerId);
+                producerBuilder.producerName(producerName);
+            }
+
             if (arguments.batchTimeMillis == 0.0 && arguments.batchMaxMessages 
== 0) {
                 producerBuilder.enableBatching(false);
             } else {
@@ -516,6 +555,7 @@ public class PerformanceProducer {
             }
 
             for (int i = 0; i < arguments.numTopics; i++) {
+
                 String topic = arguments.topics.get(i);
                 log.info("Adding {} publishers on topic {}", 
arguments.numProducers, topic);
 
@@ -580,7 +620,12 @@ public class PerformanceProducer {
                     byte[] payloadData;
 
                     if (arguments.payloadFilename != null) {
-                        payloadData = 
payloadByteList.get(random.nextInt(payloadByteList.size()));
+                        if (messageFormatter != null) {
+                            payloadData = 
messageFormatter.formatMessage(arguments.producerName, totalSent,
+                                    
payloadByteList.get(random.nextInt(payloadByteList.size())));
+                        } else {
+                            payloadData = 
payloadByteList.get(random.nextInt(payloadByteList.size()));
+                        }
                     } else {
                         payloadData = payloadBytes;
                     }
diff --git 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
index 716a05a..01588c6 100644
--- 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
+++ 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceProducerTest.java
@@ -176,4 +176,17 @@ public class PerformanceProducerTest extends 
MockedPulsarServiceBaseTest {
         thread.join();
         Assert.assertEquals(10, 
pulsar.getAdminClient().topics().getPartitionedTopicMetadata(topic).partitions);
     }
+
+    @Test
+    public void testNotExistIMessageFormatter() {
+        IMessageFormatter msgFormatter = 
PerformanceProducer.getMessageFormatter("org.apache.pulsar.testclient.NonExistentFormatter");
+        Assert.assertNull(msgFormatter);
+    }
+
+    @Test
+    public void testDefaultIMessageFormatter() {
+        IMessageFormatter msgFormatter = 
PerformanceProducer.getMessageFormatter("org.apache.pulsar.testclient.DefaultMessageFormatter");
+        Assert.assertTrue(msgFormatter instanceof DefaultMessageFormatter);
+    }
+
 }
diff --git 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
new file mode 100644
index 0000000..2d5c4b9
--- /dev/null
+++ 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/TestDefaultMessageFormatter.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.nio.charset.StandardCharsets;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class TestDefaultMessageFormatter {
+
+    @Test
+    public void testFormatMessage() {
+        String producerName = "producer-1";
+        long msgId = 3;
+        byte[] message = "{ \"producer\": \"%p\", \"msgId\": %i, \"nanoTime\": 
%t, \"float1\": %5.2f, \"float2\": %-5.2f, \"long1\": %12l, \"long2\": %l, 
\"int1\": %d, \"int2\": %1d , \"long3\": %5l,  \"str\": \"%5s\" }".getBytes();
+        byte[] formatted = new 
DefaultMessageFormatter().formatMessage(producerName, msgId, message);
+        String jsonString = new String(formatted, StandardCharsets.UTF_8);
+
+        ObjectMapper objectMapper = new ObjectMapper();
+
+        JsonNode obj = null;
+        try {
+            obj = objectMapper.readValue(jsonString, JsonNode.class);
+
+        } catch(Exception jpe) {
+            Assert.fail("Exception parsing json");
+        }
+
+        String prod = obj.get("producer").asText();
+        int mid = obj.get("msgId").asInt();
+        long nt = obj.get("nanoTime").asLong();
+        float f1 = obj.get("float1").floatValue();
+        float f2 = obj.get("float2").floatValue();
+        long l1 = obj.get("long1").asLong();
+        long l2 = obj.get("long2").asLong();
+        long i1 = obj.get("int1").asInt();
+        long i2 = obj.get("int2").asInt();
+        String str = obj.get("str").asText();
+        long l3 = obj.get("long3").asLong();
+        Assert.assertEquals(producerName, prod);
+        Assert.assertEquals(msgId, mid);
+        Assert.assertTrue( nt > 0);
+        Assert.assertNotEquals(f1, f2);
+        Assert.assertNotEquals(l1, l2);
+        Assert.assertNotEquals(i1, i2);
+        Assert.assertTrue(l3 > 0);
+        Assert.assertTrue(l3 <= 99999);
+        Assert.assertTrue(i2 < 10);
+        Assert.assertTrue(0 < i2);
+        Assert.assertTrue(f2 < 100000);
+        Assert.assertTrue( -100000 < f2);
+
+    }
+}

Reply via email to