This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch test-release
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 9c39ae6bba97dbf84063682c86c782f602fe8c32
Author: duhenglucky <[email protected]>
AuthorDate: Tue Jan 7 20:50:13 2020 +0800

    feat(pull_consumer) refactor the consumer offset update logic
---
 .../client/consumer/DefaultLitePullConsumer.java   |  2 +-
 .../client/impl/consumer/AssignedMessageQueue.java |  4 +++
 .../impl/consumer/DefaultLitePullConsumerImpl.java | 38 ++++++----------------
 3 files changed, 15 insertions(+), 29 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 782d29b..bfef761 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -269,7 +269,7 @@ public class DefaultLitePullConsumer extends ClientConfig 
implements LitePullCon
 
     @Override
     public void commitSync() {
-        this.defaultLitePullConsumerImpl.commitSync();
+        this.defaultLitePullConsumerImpl.commitAll();
     }
 
     @Override
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index 0b090e3..fad0b4f 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -177,6 +177,10 @@ public class AssignedMessageQueue {
         }
     }
 
+    public Set<MessageQueue> getAssignedMessageQueues() {
+        return this.assignedMessageQueueState.keySet();
+    }
+
     private class MessageQueueState {
         private MessageQueue messageQueue;
         private ProcessQueue processQueue;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index c3eb7fb..8483da6 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -590,37 +590,14 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         }
     }
 
-    public synchronized void commitSync() {
+    public synchronized void commitAll() {
         try {
             for (MessageQueue messageQueue : 
assignedMessageQueue.messageQueues()) {
                 long consumerOffset = 
assignedMessageQueue.getConsumerOffset(messageQueue);
                 if (consumerOffset != -1) {
                     ProcessQueue processQueue = 
assignedMessageQueue.getProcessQueue(messageQueue);
-                    long preConsumerOffset = 
this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
-                    if (processQueue != null && !processQueue.isDropped() && 
consumerOffset != preConsumerOffset) {
+                    if (processQueue != null && !processQueue.isDropped()) {
                         updateConsumeOffset(messageQueue, consumerOffset);
-                        updateConsumeOffsetToBroker(messageQueue, 
consumerOffset, false);
-                    }
-                }
-            }
-            if (defaultLitePullConsumer.getMessageModel() == 
MessageModel.BROADCASTING) {
-                offsetStore.persistAll(assignedMessageQueue.messageQueues());
-            }
-        } catch (Exception e) {
-            log.error("An error occurred when update consume offset 
synchronously.", e);
-        }
-    }
-
-    private synchronized void commitAll() {
-        try {
-            for (MessageQueue messageQueue : 
assignedMessageQueue.messageQueues()) {
-                long consumerOffset = 
assignedMessageQueue.getConsumerOffset(messageQueue);
-                if (consumerOffset != -1) {
-                    ProcessQueue processQueue = 
assignedMessageQueue.getProcessQueue(messageQueue);
-                    long preConsumerOffset = 
this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
-                    if (processQueue != null && !processQueue.isDropped() && 
consumerOffset != preConsumerOffset) {
-                        updateConsumeOffset(messageQueue, consumerOffset);
-                        updateConsumeOffsetToBroker(messageQueue, 
consumerOffset, true);
                     }
                 }
             }
@@ -927,11 +904,16 @@ public class DefaultLitePullConsumerImpl implements 
MQConsumerInner {
         try {
             checkServiceState();
             Set<MessageQueue> mqs = new HashSet<MessageQueue>();
-            Set<MessageQueue> allocateMq = 
this.rebalanceImpl.getProcessQueueTable().keySet();
-            mqs.addAll(allocateMq);
+            if (this.subscriptionType == SubscriptionType.SUBSCRIBE) {
+                Set<MessageQueue> allocateMq = 
this.rebalanceImpl.getProcessQueueTable().keySet();
+                mqs.addAll(allocateMq);
+            } else if (this.subscriptionType == SubscriptionType.ASSIGN) {
+                Set<MessageQueue> assignedMessageQueue = 
this.assignedMessageQueue.getAssignedMessageQueues();
+                mqs.addAll(assignedMessageQueue);
+            }
             this.offsetStore.persistAll(mqs);
         } catch (Exception e) {
-            log.error("group: " + 
this.defaultLitePullConsumer.getConsumerGroup() + " persistConsumerOffset 
exception", e);
+            log.error("Persist consumer offset error for group: {} ", 
this.defaultLitePullConsumer.getConsumerGroup(), e);
         }
     }
 

Reply via email to