This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9fc5185  Add subscribe initial position for consumer cli. (#6442)
9fc5185 is described below

commit 9fc51856a8c8dca6f1b668ae73d01514f153d895
Author: Fangbin Sun <[email protected]>
AuthorDate: Sun Mar 8 17:26:51 2020 +0800

    Add subscribe initial position for consumer cli. (#6442)
    
    ### Motivation
    
    In some case, users expect to consume messages from beginning similar to 
the option `--from-beginning` of kafka consumer CLI.
    
    ### Modifications
    
    Add `--subscription-position` for `pulsar-client` and `pulsar-perf`.
---
 .../main/java/org/apache/pulsar/client/cli/CmdConsume.java    | 11 ++++++++---
 .../org/apache/pulsar/testclient/PerformanceConsumer.java     |  7 ++++++-
 site2/docs/reference-cli-tools.md                             |  5 ++++-
 3 files changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 895f595..673a45e 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -50,6 +50,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
@@ -78,9 +79,12 @@ public class CmdConsume {
     @Parameter(description = "TopicName", required = true)
     private List<String> mainOptions = new ArrayList<String>();
 
-    @Parameter(names = { "-t", "--subscription-type" }, description = 
"Subscription type: Exclusive, Shared, Failover.")
+    @Parameter(names = { "-t", "--subscription-type" }, description = 
"Subscription type.")
     private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
 
+    @Parameter(names = { "-p", "--subscription-position" }, description = 
"Subscription position.")
+    private SubscriptionInitialPosition subscriptionInitialPosition = 
SubscriptionInitialPosition.Latest;
+
     @Parameter(names = { "-s", "--subscription-name" }, required = true, 
description = "Subscription name.")
     private String subscriptionName;
 
@@ -95,7 +99,7 @@ public class CmdConsume {
             + "value 0 means to consume messages as fast as possible.")
     private double consumeRate = 0;
 
-    @Parameter(names = { "--regex" }, description = "Indicate thetopic name is 
a regex pattern")
+    @Parameter(names = { "--regex" }, description = "Indicate the topic name 
is a regex pattern")
     private boolean isRegex = false;
 
     private ClientBuilder clientBuilder;
@@ -182,7 +186,8 @@ public class CmdConsume {
             PulsarClient client = clientBuilder.build();
             ConsumerBuilder<byte[]> builder = client.newConsumer()
                     .subscriptionName(this.subscriptionName)
-                    .subscriptionType(subscriptionType);
+                    .subscriptionType(subscriptionType)
+                    .subscriptionInitialPosition(subscriptionInitialPosition);
 
             if (isRegex) {
                 builder.topicsPattern(Pattern.compile(topic));
diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 5c569fd..cf38f27 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.EncryptionKeyInfo;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.slf4j.Logger;
@@ -86,9 +87,12 @@ public class PerformanceConsumer {
         @Parameter(names = { "-s", "--subscriber-name" }, description = 
"Subscriber name prefix")
         public String subscriberName = "sub";
 
-        @Parameter(names = { "-st", "--subscription-type" }, description = 
"Subscriber name prefix")
+        @Parameter(names = { "-st", "--subscription-type" }, description = 
"Subscription type")
         public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
 
+        @Parameter(names = { "-sp", "--subscription-position" }, description = 
"Subscription position")
+        private SubscriptionInitialPosition subscriptionInitialPosition = 
SubscriptionInitialPosition.Latest;
+
         @Parameter(names = { "-r", "--rate" }, description = "Simulate a slow 
message consumer (rate in msg/s)")
         public double rate = 0;
 
@@ -257,6 +261,7 @@ public class PerformanceConsumer {
                 .receiverQueueSize(arguments.receiverQueueSize) //
                 
.acknowledgmentGroupTime(arguments.acknowledgmentsGroupingDelayMillis, 
TimeUnit.MILLISECONDS) //
                 .subscriptionType(arguments.subscriptionType)
+                
.subscriptionInitialPosition(arguments.subscriptionInitialPosition)
                 .replicateSubscriptionState(arguments.replicatedSubscription);
 
         if (arguments.encKeyName != null) {
diff --git a/site2/docs/reference-cli-tools.md 
b/site2/docs/reference-cli-tools.md
index cd737cd..9a87c8c 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -325,8 +325,10 @@ Options
 |`--hex`|Display binary messages in hexadecimal format.|false|
 |`-n`, `--num-messages`|Number of messages to consume, 0 means to consume 
forever.|1|
 |`-r`, `--rate`|Rate (in messages per second) at which to consume; a value 0 
means to consume messages as fast as possible|0.0|
+|`--regex`|Indicate the topic name is a regex pattern|false|
 |`-s`, `--subscription-name`|Subscription name||
 |`-t`, `--subscription-type`|The type of the subscription. Possible values: 
Exclusive, Shared, Failover, Key_Shared.|Exclusive|
+|`-p`, `--subscription-position`|The position of the subscription. Possible 
values: Latest, Earliest.|Latest|
 
 
 
@@ -426,7 +428,8 @@ Options
 |`-u`, `--service-url`|Pulsar service URL||
 |`-i`, `--stats-interval-seconds`|Statistics interval seconds. If 0, 
statistics will be disabled|0|
 |`-s`, `--subscriber-name`|Subscriber name prefix|sub|
-|`-st`, `--subscription-type`|Subscriber name prefix. Possible values are 
Exclusive, Shared, Failover.|Exclusive|
+|`-st`, `--subscription-type`|Subscriber type. Possible values are Exclusive, 
Shared, Failover, Key_Shared.|Exclusive|
+|`-sp`, `--subscription-position`|Subscriber position. Possible values are 
Latest, Earliest.|Latest|
 |`--trust-cert-file`|Path for the trusted TLS certificate file||
 
 

Reply via email to