This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new dc815cc  [ISSUE #1912]Polish the committed offset logic for the lite 
pull consumer.
dc815cc is described below

commit dc815cc581f9e9dd36efa926fc7087ff1ecbf1a6
Author: Heng Du <[email protected]>
AuthorDate: Fri Apr 3 16:30:39 2020 +0800

    [ISSUE #1912]Polish the committed offset logic for the lite pull consumer.
    
    [ISSUE #1912]Polish the committed offset logic for the lite pull consumer.
---
 .../apache/rocketmq/client/consumer/DefaultLitePullConsumer.java    | 6 +++---
 .../rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java  | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index ef76cfd..6718eb5 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -279,7 +279,7 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
 
     @Override
     public Long committed(MessageQueue messageQueue) throws MQClientException {
-        return this.defaultLitePullConsumerImpl.committed(messageQueue);
+        return 
this.defaultLitePullConsumerImpl.committed(queueWithNamespace(messageQueue));
     }
 
     @Override
@@ -289,12 +289,12 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
 
     @Override
     public void seekToBegin(MessageQueue messageQueue) throws 
MQClientException {
-        this.defaultLitePullConsumerImpl.seekToBegin(messageQueue);
+        
this.defaultLitePullConsumerImpl.seekToBegin(queueWithNamespace(messageQueue));
     }
 
     @Override
     public void seekToEnd(MessageQueue messageQueue) throws MQClientException {
-        this.defaultLitePullConsumerImpl.seekToEnd(messageQueue);
+        
this.defaultLitePullConsumerImpl.seekToEnd(queueWithNamespace(messageQueue));
     }
 
     @Override
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 8ad7a6b..f54078f 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -631,7 +631,7 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
 
     public long committed(MessageQueue messageQueue) throws MQClientException {
         checkServiceState();
-        long offset = this.offsetStore.readOffset(messageQueue, 
ReadOffsetType.READ_FROM_STORE);
+        long offset = this.offsetStore.readOffset(messageQueue, 
ReadOffsetType.MEMORY_FIRST_THEN_STORE);
         if (offset == -2)
             throw new MQClientException("Fetch consume offset from broker 
exception", null);
         return offset;

Reply via email to