This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new dc815cc [ISSUE #1912]Polish the committed offset logic for the lite
pull consumer.
dc815cc is described below
commit dc815cc581f9e9dd36efa926fc7087ff1ecbf1a6
Author: Heng Du <[email protected]>
AuthorDate: Fri Apr 3 16:30:39 2020 +0800
[ISSUE #1912]Polish the committed offset logic for the lite pull consumer.
[ISSUE #1912]Polish the committed offset logic for the lite pull consumer.
---
.../apache/rocketmq/client/consumer/DefaultLitePullConsumer.java | 6 +++---
.../rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java | 2 +-
2 files changed, 4 insertions(+), 4 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index ef76cfd..6718eb5 100644
---
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -279,7 +279,7 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
@Override
public Long committed(MessageQueue messageQueue) throws MQClientException {
- return this.defaultLitePullConsumerImpl.committed(messageQueue);
+ return
this.defaultLitePullConsumerImpl.committed(queueWithNamespace(messageQueue));
}
@Override
@@ -289,12 +289,12 @@ public class DefaultLitePullConsumer extends ClientConfig
implements LitePullCon
@Override
public void seekToBegin(MessageQueue messageQueue) throws
MQClientException {
- this.defaultLitePullConsumerImpl.seekToBegin(messageQueue);
+
this.defaultLitePullConsumerImpl.seekToBegin(queueWithNamespace(messageQueue));
}
@Override
public void seekToEnd(MessageQueue messageQueue) throws MQClientException {
- this.defaultLitePullConsumerImpl.seekToEnd(messageQueue);
+
this.defaultLitePullConsumerImpl.seekToEnd(queueWithNamespace(messageQueue));
}
@Override
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 8ad7a6b..f54078f 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -631,7 +631,7 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
public long committed(MessageQueue messageQueue) throws MQClientException {
checkServiceState();
- long offset = this.offsetStore.readOffset(messageQueue,
ReadOffsetType.READ_FROM_STORE);
+ long offset = this.offsetStore.readOffset(messageQueue,
ReadOffsetType.MEMORY_FIRST_THEN_STORE);
if (offset == -2)
throw new MQClientException("Fetch consume offset from broker
exception", null);
return offset;