This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5dc2e20ef [ISSUE #6728] Compute the confirmOffset without considering 
new connections (#6729)
5dc2e20ef is described below

commit 5dc2e20efc9866f0f9d4f242d41ad4b4cfc65644
Author: Ji Juntao <[email protected]>
AuthorDate: Thu May 11 11:30:16 2023 +0800

    [ISSUE #6728] Compute the confirmOffset without considering new connections 
(#6729)
    
    * 1. When compute the confirmOffset, dismiss the ackOffset of new 
connections. 2. When compute the confirmOffset, use getConfirmOffsetDirectly() 
to avoid the endless calling.
    
    * use the calculated slaveAckOffset
    
    * optimize the logic.
---
 .../java/org/apache/rocketmq/store/CommitLog.java  | 22 +++++++++++++++++++++-
 .../apache/rocketmq/store/DefaultMessageStore.java |  8 ++++++++
 .../store/ha/autoswitch/AutoSwitchHAService.java   |  5 +++--
 3 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index ed5e320be..56f19529d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -553,15 +553,35 @@ public class CommitLog implements Swappable {
         }
     }
 
+    // Fetch and compute the newest confirmOffset.
+    // Even if it is just inited.
     public long getConfirmOffset() {
         if 
(this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
             if 
(this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != 
BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isFenced()) {
                 if (((AutoSwitchHAService) 
this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) {
                     return this.defaultMessageStore.getMaxPhyOffset();
                 }
-                // First time compute confirmOffset.
+                // First time it will compute the confirmOffset.
                 if (this.confirmOffset <= 0) {
                     setConfirmOffset(((AutoSwitchHAService) 
this.defaultMessageStore.getHaService()).computeConfirmOffset());
+                    log.info("Init the confirmOffset to {}.", 
this.confirmOffset);
+                }
+            }
+            return this.confirmOffset;
+        } else if 
(this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {
+            return this.confirmOffset;
+        } else {
+            return getMaxOffset();
+        }
+    }
+
+    // Fetch the original confirmOffset's value.
+    // Without checking and re-computing.
+    public long getConfirmOffsetDirectly() {
+        if 
(this.defaultMessageStore.getBrokerConfig().isEnableControllerMode()) {
+            if 
(this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() != 
BrokerRole.SLAVE && !this.defaultMessageStore.getRunningFlags().isFenced()) {
+                if (((AutoSwitchHAService) 
this.defaultMessageStore.getHaService()).getLocalSyncStateSet().size() == 1) {
+                    return this.defaultMessageStore.getMaxPhyOffset();
                 }
             }
             return this.confirmOffset;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 0b1f69ee7..6b0516b04 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -1589,11 +1589,19 @@ public class DefaultMessageStore implements 
MessageStore {
         }
     }
 
+    // Fetch and compute the newest confirmOffset.
+    // Even if it is just inited.
     @Override
     public long getConfirmOffset() {
         return this.commitLog.getConfirmOffset();
     }
 
+    // Fetch the original confirmOffset's value.
+    // Without checking and re-computing.
+    public long getConfirmOffsetDirectly() {
+        return this.commitLog.getConfirmOffsetDirectly();
+    }
+
     @Override
     public void setConfirmOffset(long phyOffset) {
         this.commitLog.setConfirmOffset(phyOffset);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 3a918ee8e..6dc734e0c 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -421,13 +421,14 @@ public class AutoSwitchHAService extends DefaultHAService 
{
         for (Long syncId : currentSyncStateSet) {
             if (!idList.contains(syncId) && this.brokerControllerId != null && 
!Objects.equals(syncId, this.brokerControllerId)) {
                 LOGGER.warn("Slave {} is still in syncStateSet, but has lost 
its connection. So new offset can't be compute.", syncId);
-                return this.defaultMessageStore.getConfirmOffset();
+                // Without check and re-compute, return the confirmOffset's 
value directly.
+                return this.defaultMessageStore.getConfirmOffsetDirectly();
             }
         }
 
         for (HAConnection connection : this.connectionList) {
             final Long slaveId = ((AutoSwitchHAConnection) 
connection).getSlaveId();
-            if (currentSyncStateSet.contains(slaveId)) {
+            if (currentSyncStateSet.contains(slaveId) && 
connection.getSlaveAckOffset() > 0) {
                 newConfirmOffset = Math.min(newConfirmOffset, 
connection.getSlaveAckOffset());
             }
         }

Reply via email to