This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch test-release
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 3cb8d352ca0853b6d951fb889f23cee4cd9f931f
Author: 翊名 <[email protected]>
AuthorDate: Mon Nov 25 16:14:31 2019 +0800

    feat(PullConsumer) support begin/end seek support for  pull consumer
---
 .../client/consumer/DefaultLitePullConsumer.java   | 28 ++++++++++++++-----
 .../rocketmq/client/consumer/LitePullConsumer.java | 23 ++++++++++++++++
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 32 ++++++++++++++++------
 3 files changed, 67 insertions(+), 16 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 99976d5..09f7e20 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -18,7 +18,6 @@ package org.apache.rocketmq.client.consumer;
 
 import java.util.Collection;
 import java.util.List;
-
 import org.apache.rocketmq.client.ClientConfig;
 import 
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
@@ -35,12 +34,12 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
     private final DefaultLitePullConsumerImpl defaultLitePullConsumerImpl;
 
     /**
-     * Consumers belonging to the same consumer group share a group id. The 
consumers in a group then
-     * divides the topic as fairly amongst themselves as possible by 
establishing that each queue is only
-     * consumed by a single consumer from the group. If all consumers are from 
the same group, it functions
-     * as a traditional message queue. Each message would be consumed by one 
consumer of the group only.
-     * When multiple consumer groups exist, the flow of the data consumption 
model aligns with the traditional
-     * publish-subscribe model. The messages are broadcast to all consumer 
groups.
+     * Consumers belonging to the same consumer group share a group id. The 
consumers in a group then divides the topic
+     * as fairly amongst themselves as possible by establishing that each 
queue is only consumed by a single consumer
+     * from the group. If all consumers are from the same group, it functions 
as a traditional message queue. Each
+     * message would be consumed by one consumer of the group only. When 
multiple consumer groups exist, the flow of the
+     * data consumption model aligns with the traditional publish-subscribe 
model. The messages are broadcast to all
+     * consumer groups.
      */
     private String consumerGroup;
 
@@ -267,6 +266,21 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
     }
 
     @Override
+    public void updateNameServerAddress(String nameServerAddress) {
+        
this.defaultLitePullConsumerImpl.updateNameServerAddr(nameServerAddress);
+    }
+
+    @Override
+    public void seekToBegin(MessageQueue messageQueue) throws 
MQClientException {
+        this.defaultLitePullConsumerImpl.seekToBegin(messageQueue);
+    }
+
+    @Override
+    public void seekToEnd(MessageQueue messageQueue) throws MQClientException {
+        this.defaultLitePullConsumerImpl.seekToEnd(messageQueue);
+    }
+
+    @Override
     public boolean isAutoCommit() {
         return autoCommit;
     }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
index d6e657f..ce22288 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/LitePullConsumer.java
@@ -172,4 +172,27 @@ public interface LitePullConsumer {
      */
     void registerTopicMessageQueueChangeListener(String topic,
         TopicMessageQueueChangeListener topicMessageQueueChangeListener) 
throws MQClientException;
+
+    /**
+     * Update name server addresses.
+     */
+    void updateNameServerAddress(String nameServerAddress);
+
+    /**
+     * Overrides the fetch offsets with the begin offset that the consumer 
will use on the next poll. If this API is
+     * invoked for the same message queue more than once, the latest offset 
will be used on the next poll(). Note that
+     * you may lose data if this API is arbitrarily used in the middle of 
consumption.
+     *
+     * @param messageQueue
+     */
+    void seekToBegin(MessageQueue messageQueue)throws MQClientException;
+
+    /**
+     * Overrides the fetch offsets with the end offset that the consumer will 
use on the next poll. If this API is
+     * invoked for the same message queue more than once, the latest offset 
will be used on the next poll(). Note that
+     * you may lose data if this API is arbitrarily used in the middle of 
consumption.
+     *
+     * @param messageQueue
+     */
+    void seekToEnd(MessageQueue messageQueue)throws MQClientException;
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 2c673a1..f44eea7 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -16,16 +16,16 @@
  */
 package org.apache.rocketmq.client.impl.consumer;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
-import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -35,13 +35,12 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
-import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
-import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.MessageQueueListener;
+import org.apache.rocketmq.client.consumer.MessageSelector;
 import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.TopicMessageQueueChangeListener;
 import org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
@@ -164,6 +163,10 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
             throw new IllegalStateException(NOT_RUNNING_EXCEPTION_MESSAGE);
     }
 
+    public void updateNameServerAddr(String newAddresses) {
+        
this.mQClientFactory.getMQClientAPIImpl().updateNameServerAddressList(newAddresses);
+    }
+
     private synchronized void setSubscriptionType(SubscriptionType type) {
         if (this.subscriptionType == SubscriptionType.NONE)
             this.subscriptionType = type;
@@ -556,6 +559,16 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         }
     }
 
+    public void seekToBegin(MessageQueue messageQueue) throws 
MQClientException {
+        long begin = minOffset(messageQueue);
+        this.seek(messageQueue, begin);
+    }
+
+    public void seekToEnd(MessageQueue messageQueue) throws MQClientException {
+        long begin = maxOffset(messageQueue);
+        this.seek(messageQueue, begin);
+    }
+
     private long maxOffset(MessageQueue messageQueue) throws MQClientException 
{
         checkServiceState();
         return this.mQClientFactory.getMQAdminImpl().maxOffset(messageQueue);
@@ -764,8 +777,9 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
                         subscriptionData = 
FilterAPI.buildSubscriptionData(defaultLitePullConsumer.getConsumerGroup(),
                             topic, SubscriptionData.SUB_ALL);
                     }
-
                     PullResult pullResult = pull(messageQueue, 
subscriptionData, offset, nextPullBatchSize());
+
+
                     switch (pullResult.getPullStatus()) {
                         case FOUND:
                             final Object objLock = 
messageQueueLock.fetchLockObject(messageQueue);

Reply via email to