This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch test-release in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 9c39ae6bba97dbf84063682c86c782f602fe8c32 Author: duhenglucky <[email protected]> AuthorDate: Tue Jan 7 20:50:13 2020 +0800 feat(pull_consumer) refactor the consumer offset update logic --- .../client/consumer/DefaultLitePullConsumer.java | 2 +- .../client/impl/consumer/AssignedMessageQueue.java | 4 +++ .../impl/consumer/DefaultLitePullConsumerImpl.java | 38 ++++++---------------- 3 files changed, 15 insertions(+), 29 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java index 782d29b..bfef761 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java @@ -269,7 +269,7 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon @Override public void commitSync() { - this.defaultLitePullConsumerImpl.commitSync(); + this.defaultLitePullConsumerImpl.commitAll(); } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java index 0b090e3..fad0b4f 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java @@ -177,6 +177,10 @@ public class AssignedMessageQueue { } } + public Set<MessageQueue> getAssignedMessageQueues() { + return this.assignedMessageQueueState.keySet(); + } + private class MessageQueueState { private MessageQueue messageQueue; private ProcessQueue processQueue; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java index c3eb7fb..8483da6 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java @@ -590,37 +590,14 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { } } - public synchronized void commitSync() { + public synchronized void commitAll() { try { for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue); if (consumerOffset != -1) { ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); - long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); - if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) { + if (processQueue != null && !processQueue.isDropped()) { updateConsumeOffset(messageQueue, consumerOffset); - updateConsumeOffsetToBroker(messageQueue, consumerOffset, false); - } - } - } - if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) { - offsetStore.persistAll(assignedMessageQueue.messageQueues()); - } - } catch (Exception e) { - log.error("An error occurred when update consume offset synchronously.", e); - } - } - - private synchronized void commitAll() { - try { - for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) { - long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue); - if (consumerOffset != -1) { - ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue); - long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY); - if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) { - updateConsumeOffset(messageQueue, consumerOffset); - updateConsumeOffsetToBroker(messageQueue, consumerOffset, true); } } } @@ -927,11 +904,16 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner { try { checkServiceState(); Set<MessageQueue> mqs = new HashSet<MessageQueue>(); - Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); - mqs.addAll(allocateMq); + if (this.subscriptionType == SubscriptionType.SUBSCRIBE) { + Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet(); + mqs.addAll(allocateMq); + } else if (this.subscriptionType == SubscriptionType.ASSIGN) { + Set<MessageQueue> assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueues(); + mqs.addAll(assignedMessageQueue); + } this.offsetStore.persistAll(mqs); } catch (Exception e) { - log.error("group: " + this.defaultLitePullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e); + log.error("Persist consumer offset error for group: {} ", this.defaultLitePullConsumer.getConsumerGroup(), e); } }
