This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9fc5185 Add subscribe initial position for consumer cli. (#6442)
9fc5185 is described below
commit 9fc51856a8c8dca6f1b668ae73d01514f153d895
Author: Fangbin Sun <[email protected]>
AuthorDate: Sun Mar 8 17:26:51 2020 +0800
Add subscribe initial position for consumer cli. (#6442)
### Motivation
In some case, users expect to consume messages from beginning similar to
the option `--from-beginning` of kafka consumer CLI.
### Modifications
Add `--subscription-position` for `pulsar-client` and `pulsar-perf`.
---
.../main/java/org/apache/pulsar/client/cli/CmdConsume.java | 11 ++++++++---
.../org/apache/pulsar/testclient/PerformanceConsumer.java | 7 ++++++-
site2/docs/reference-cli-tools.md | 5 ++++-
3 files changed, 18 insertions(+), 5 deletions(-)
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 895f595..673a45e 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
@@ -78,9 +79,12 @@ public class CmdConsume {
@Parameter(description = "TopicName", required = true)
private List<String> mainOptions = new ArrayList<String>();
- @Parameter(names = { "-t", "--subscription-type" }, description =
"Subscription type: Exclusive, Shared, Failover.")
+ @Parameter(names = { "-t", "--subscription-type" }, description =
"Subscription type.")
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+ @Parameter(names = { "-p", "--subscription-position" }, description =
"Subscription position.")
+ private SubscriptionInitialPosition subscriptionInitialPosition =
SubscriptionInitialPosition.Latest;
+
@Parameter(names = { "-s", "--subscription-name" }, required = true,
description = "Subscription name.")
private String subscriptionName;
@@ -95,7 +99,7 @@ public class CmdConsume {
+ "value 0 means to consume messages as fast as possible.")
private double consumeRate = 0;
- @Parameter(names = { "--regex" }, description = "Indicate thetopic name is
a regex pattern")
+ @Parameter(names = { "--regex" }, description = "Indicate the topic name
is a regex pattern")
private boolean isRegex = false;
private ClientBuilder clientBuilder;
@@ -182,7 +186,8 @@ public class CmdConsume {
PulsarClient client = clientBuilder.build();
ConsumerBuilder<byte[]> builder = client.newConsumer()
.subscriptionName(this.subscriptionName)
- .subscriptionType(subscriptionType);
+ .subscriptionType(subscriptionType)
+ .subscriptionInitialPosition(subscriptionInitialPosition);
if (isRegex) {
builder.topicsPattern(Pattern.compile(topic));
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 5c569fd..cf38f27 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.EncryptionKeyInfo;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
@@ -86,9 +87,12 @@ public class PerformanceConsumer {
@Parameter(names = { "-s", "--subscriber-name" }, description =
"Subscriber name prefix")
public String subscriberName = "sub";
- @Parameter(names = { "-st", "--subscription-type" }, description =
"Subscriber name prefix")
+ @Parameter(names = { "-st", "--subscription-type" }, description =
"Subscription type")
public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+ @Parameter(names = { "-sp", "--subscription-position" }, description =
"Subscription position")
+ private SubscriptionInitialPosition subscriptionInitialPosition =
SubscriptionInitialPosition.Latest;
+
@Parameter(names = { "-r", "--rate" }, description = "Simulate a slow
message consumer (rate in msg/s)")
public double rate = 0;
@@ -257,6 +261,7 @@ public class PerformanceConsumer {
.receiverQueueSize(arguments.receiverQueueSize) //
.acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis,
TimeUnit.MILLISECONDS) //
.subscriptionType(arguments.subscriptionType)
+
.subscriptionInitialPosition(arguments.subscriptionInitialPosition)
.replicateSubscriptionState(arguments.replicatedSubscription);
if (arguments.encKeyName != null) {
diff --git a/site2/docs/reference-cli-tools.md
b/site2/docs/reference-cli-tools.md
index cd737cd..9a87c8c 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -325,8 +325,10 @@ Options
|`--hex`|Display binary messages in hexadecimal format.|false|
|`-n`, `--num-messages`|Number of messages to consume, 0 means to consume
forever.|1|
|`-r`, `--rate`|Rate (in messages per second) at which to consume; a value 0
means to consume messages as fast as possible|0.0|
+|`--regex`|Indicate the topic name is a regex pattern|false|
|`-s`, `--subscription-name`|Subscription name||
|`-t`, `--subscription-type`|The type of the subscription. Possible values:
Exclusive, Shared, Failover, Key_Shared.|Exclusive|
+|`-p`, `--subscription-position`|The position of the subscription. Possible
values: Latest, Earliest.|Latest|
@@ -426,7 +428,8 @@ Options
|`-u`, `--service-url`|Pulsar service URL||
|`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0,
statistics will be disabled|0|
|`-s`, `--subscriber-name`|Subscriber name prefix|sub|
-|`-st`, `--subscription-type`|Subscriber name prefix. Possible values are
Exclusive, Shared, Failover.|Exclusive|
+|`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive,
Shared, Failover, Key_Shared.|Exclusive|
+|`-sp`, `--subscription-position`|Subscriber position. Possible values are
Latest, Earliest.|Latest|
|`--trust-cert-file`|Path for the trusted TLS certificate file||