mxsm commented on code in PR #5798:
URL: https://github.com/apache/rocketmq/pull/5798#discussion_r1062136304


##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java:
##########
@@ -258,12 +269,38 @@ public void maybeExpandInSyncStateSet(final String 
slaveAddress, final long slav
             final EpochEntry currentLeaderEpoch = this.epochCache.lastEntry();
             if (slaveMaxOffset >= currentLeaderEpoch.getStartOffset()) {
                 currentSyncStateSet.add(slaveAddress);
+                markSynchronizingSyncStateSet(currentSyncStateSet);
                 // Notify the upper layer that syncStateSet changed.
                 notifySyncStateSetChanged(currentSyncStateSet);
             }
         }
     }
 
+    private void markSynchronizingSyncStateSet(final Set<String> 
newSyncStateSet) {
+        this.writeLock.lock();
+        try {
+            this.isSynchronizingSyncStateSet = true;
+            this.remoteSyncStateSet.clear();
+            this.remoteSyncStateSet.addAll(newSyncStateSet);
+        } finally {
+            this.writeLock.unlock();
+        }
+    }
+
+    private void markSynchronizingSyncStateSetDone() {
+        this.writeLock.lock();
+        try {
+            this.isSynchronizingSyncStateSet = false;
+            this.remoteSyncStateSet.clear();
+        } finally {
+            this.writeLock.unlock();
+        }
+    }

Review Comment:
   lock not must. this method is invoked by setSyncStateSet method that has 
write lock
   ```
       public void setSyncStateSet(final Set<String> syncStateSet) {
           this.writeLock.lock();
           try {
               markSynchronizingSyncStateSetDone();
               this.syncStateSet.clear();
               this.syncStateSet.addAll(syncStateSet);
               this.confirmOffset = computeConfirmOffset();
           } finally {
               this.writeLock.unlock();
           }
       }
   ```



##########
store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java:
##########
@@ -356,26 +402,44 @@ private long computeConfirmOffset() {
     }
 
     public void setSyncStateSet(final Set<String> syncStateSet) {
-        final Lock writeLock = readWriteLock.writeLock();
+        this.writeLock.lock();
         try {
-            writeLock.lock();
+            markSynchronizingSyncStateSetDone();
             this.syncStateSet.clear();
             this.syncStateSet.addAll(syncStateSet);
             this.confirmOffset = computeConfirmOffset();
         } finally {
-            writeLock.unlock();
+            this.writeLock.unlock();
         }
     }
 
+    /**
+     * Return the union of the local and remote syncStateSets
+     */
     public Set<String> getSyncStateSet() {
-        final Lock readLock = readWriteLock.readLock();
+        this.readLock.lock();
+        try {
+            if (this.isSynchronizingSyncStateSet) {
+                Set<String> unionSyncStateSet = new 
HashSet<>(this.syncStateSet.size() + this.remoteSyncStateSet.size());
+                unionSyncStateSet.addAll(this.syncStateSet);
+                unionSyncStateSet.addAll(this.remoteSyncStateSet);
+                return unionSyncStateSet;
+            } else {
+                return new HashSet<>(this.syncStateSet);
+            }

Review Comment:
   setting Hashset size



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to