This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bf208d2316 [feat][test] Support delay messages in a random number of 
seconds by the range (#20717)
4bf208d2316 is described below

commit 4bf208d231654938c40d95f0d040d5c3c9ddccd4
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu Jul 6 06:26:02 2023 +0800

    [feat][test] Support delay messages in a random number of seconds by the 
range (#20717)
---
 .../pulsar/testclient/PerformanceProducer.java     | 29 ++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 389a6af4aaa..c1d4f54e4d7 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -18,16 +18,19 @@
  */
 package org.apache.pulsar.testclient;
 
+import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES;
 import static 
org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES;
 import static 
org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
+import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Range;
 import com.google.common.util.concurrent.RateLimiter;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.FileOutputStream;
@@ -200,6 +203,11 @@ public class PerformanceProducer {
                 "--delay" }, description = "Mark messages with a given delay 
in seconds")
         public long delay = 0;
 
+        @Parameter(names = { "-dr", "--delay-range"}, description = "Mark 
messages with a given delay by a random"
+                + " number of seconds. this value between the specified origin 
(inclusive) and the specified bound"
+                + " (exclusive). e.g. \"1,300\"", converter = 
RangeConvert.class)
+        public Range<Long> delayRange = null;
+
         @Parameter(names = { "-set",
                 "--set-event-time" }, description = "Set the eventTime on 
messages")
         public boolean setEventTime = false;
@@ -622,6 +630,10 @@ public class PerformanceProducer {
                     }
                     if (arguments.delay > 0) {
                         messageBuilder.deliverAfter(arguments.delay, 
TimeUnit.SECONDS);
+                    } else if (arguments.delayRange != null) {
+                        final long deliverAfter = ThreadLocalRandom.current()
+                                
.nextLong(arguments.delayRange.lowerEndpoint(), 
arguments.delayRange.upperEndpoint());
+                        messageBuilder.deliverAfter(deliverAfter, 
TimeUnit.SECONDS);
                     }
                     if (arguments.setEventTime) {
                         messageBuilder.eventTime(System.currentTimeMillis());
@@ -783,4 +795,21 @@ public class PerformanceProducer {
     public enum MessageKeyGenerationMode {
         autoIncrement, random
     }
+
+    static class RangeConvert implements IStringConverter<Range<Long>> {
+        @Override
+        public Range<Long> convert(String rangeStr) {
+            try {
+                requireNonNull(rangeStr);
+                final String[] facts = rangeStr.substring(1, rangeStr.length() 
- 1).split(",");
+                final long min = Long.parseLong(facts[0].trim());
+                final long max = Long.parseLong(facts[1].trim());
+                return Range.closedOpen(min, max);
+            } catch (Throwable ex) {
+                throw new IllegalArgumentException("Unknown delay range 
interval,"
+                        + " the format should be \"<origin>,<bound>\". error 
message: " + rangeStr);
+            }
+        }
+    }
+
 }

Reply via email to