This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4bf208d2316 [feat][test] Support delay messages in a random number of
seconds by the range (#20717)
4bf208d2316 is described below
commit 4bf208d231654938c40d95f0d040d5c3c9ddccd4
Author: Qiang Zhao <[email protected]>
AuthorDate: Thu Jul 6 06:26:02 2023 +0800
[feat][test] Support delay messages in a random number of seconds by the
range (#20717)
---
.../pulsar/testclient/PerformanceProducer.java | 29 ++++++++++++++++++++++
1 file changed, 29 insertions(+)
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
index 389a6af4aaa..c1d4f54e4d7 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java
@@ -18,16 +18,19 @@
*/
package org.apache.pulsar.testclient;
+import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static
org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_BATCHING_MAX_MESSAGES;
import static
org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES;
import static
org.apache.pulsar.client.impl.conf.ProducerConfigurationData.DEFAULT_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS;
+import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
+import com.google.common.collect.Range;
import com.google.common.util.concurrent.RateLimiter;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.FileOutputStream;
@@ -200,6 +203,11 @@ public class PerformanceProducer {
"--delay" }, description = "Mark messages with a given delay
in seconds")
public long delay = 0;
+ @Parameter(names = { "-dr", "--delay-range"}, description = "Mark
messages with a given delay by a random"
+ + " number of seconds. this value between the specified origin
(inclusive) and the specified bound"
+ + " (exclusive). e.g. \"1,300\"", converter =
RangeConvert.class)
+ public Range<Long> delayRange = null;
+
@Parameter(names = { "-set",
"--set-event-time" }, description = "Set the eventTime on
messages")
public boolean setEventTime = false;
@@ -622,6 +630,10 @@ public class PerformanceProducer {
}
if (arguments.delay > 0) {
messageBuilder.deliverAfter(arguments.delay,
TimeUnit.SECONDS);
+ } else if (arguments.delayRange != null) {
+ final long deliverAfter = ThreadLocalRandom.current()
+
.nextLong(arguments.delayRange.lowerEndpoint(),
arguments.delayRange.upperEndpoint());
+ messageBuilder.deliverAfter(deliverAfter,
TimeUnit.SECONDS);
}
if (arguments.setEventTime) {
messageBuilder.eventTime(System.currentTimeMillis());
@@ -783,4 +795,21 @@ public class PerformanceProducer {
public enum MessageKeyGenerationMode {
autoIncrement, random
}
+
+ static class RangeConvert implements IStringConverter<Range<Long>> {
+ @Override
+ public Range<Long> convert(String rangeStr) {
+ try {
+ requireNonNull(rangeStr);
+ final String[] facts = rangeStr.substring(1, rangeStr.length()
- 1).split(",");
+ final long min = Long.parseLong(facts[0].trim());
+ final long max = Long.parseLong(facts[1].trim());
+ return Range.closedOpen(min, max);
+ } catch (Throwable ex) {
+ throw new IllegalArgumentException("Unknown delay range
interval,"
+ + " the format should be \"<origin>,<bound>\". error
message: " + rangeStr);
+ }
+ }
+ }
+
}