This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch test-release in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 171744886f8f1530027b32b0c07c960ed5fd2258 Author: 翊名 <[email protected]> AuthorDate: Tue Jan 7 20:50:13 2020 +0800 feat(pull_consumer) refactor the consumer offset update logic --- .../org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java | 3 ++- .../rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java index fad0b4f..4ab776a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java @@ -177,7 +177,8 @@ public class AssignedMessageQueue { } } - public Set<MessageQueue> getAssignedMessageQueues() { + + public Set<MessageQueue> getAssignedMessageQueue() { return this.assignedMessageQueueState.keySet(); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index 8483da6..4b732a7 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -908,7 +908,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); mqs.addAll(allocateMq); } else if (this.subscriptionType == SubscriptionType.ASSIGN) { - Set<MessageQueue> assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueues(); + Set<MessageQueue> assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueue(); mqs.addAll(assignedMessageQueue); } this.offsetStore.persistAll(mqs);
