This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 017ad11047 [ISSUE #7219] Fix Concurrent modify syncStateSet and Mark
synchronizing frequently when shrink. (#7220)
017ad11047 is described below
commit 017ad110475e8024585327b44f47e5e97aabc63b
Author: echooymxq <[email protected]>
AuthorDate: Wed Aug 23 11:11:42 2023 +0800
[ISSUE #7219] Fix Concurrent modify syncStateSet and Mark synchronizing
frequently when shrink. (#7220)
---
.../broker/controller/ReplicasManager.java | 29 ++++++++++++----------
.../store/ha/autoswitch/AutoSwitchHAService.java | 21 +++++++++-------
2 files changed, 28 insertions(+), 22 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index abae7cdb01..37c82e434b 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -542,7 +542,7 @@ public class ReplicasManager {
this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(),
brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
this.tempBrokerMetadata.clear();
this.brokerControllerId = this.brokerMetadata.getBrokerId();
- this.haService.setBrokerControllerId(this.brokerControllerId);
+ this.haService.setLocalBrokerId(this.brokerControllerId);
return true;
} catch (Exception e) {
LOGGER.error("fail to create metadata file", e);
@@ -594,7 +594,7 @@ public class ReplicasManager {
if (this.brokerMetadata.isLoaded()) {
this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
this.brokerControllerId = brokerMetadata.getBrokerId();
- this.haService.setBrokerControllerId(this.brokerControllerId);
+ this.haService.setLocalBrokerId(this.brokerControllerId);
return;
}
// 2. check if temp metadata exist
@@ -735,23 +735,26 @@ public class ReplicasManager {
if (this.checkSyncStateSetTaskFuture != null) {
this.checkSyncStateSetTaskFuture.cancel(false);
}
- this.checkSyncStateSetTaskFuture =
this.scheduledService.scheduleAtFixedRate(() -> {
- checkSyncStateSetAndDoReport();
- }, 3 * 1000, this.brokerConfig.getCheckSyncStateSetPeriod(),
TimeUnit.MILLISECONDS);
+ this.checkSyncStateSetTaskFuture =
this.scheduledService.scheduleAtFixedRate(this::checkSyncStateSetAndDoReport, 3
* 1000,
+ this.brokerConfig.getCheckSyncStateSetPeriod(),
TimeUnit.MILLISECONDS);
}
private void checkSyncStateSetAndDoReport() {
- final Set<Long> newSyncStateSet =
this.haService.maybeShrinkSyncStateSet();
- newSyncStateSet.add(this.brokerControllerId);
- synchronized (this) {
- if (this.syncStateSet != null) {
- // Check if syncStateSet changed
- if (this.syncStateSet.size() == newSyncStateSet.size() &&
this.syncStateSet.containsAll(newSyncStateSet)) {
- return;
+ try {
+ final Set<Long> newSyncStateSet =
this.haService.maybeShrinkSyncStateSet();
+ newSyncStateSet.add(this.brokerControllerId);
+ synchronized (this) {
+ if (this.syncStateSet != null) {
+ // Check if syncStateSet changed
+ if (this.syncStateSet.size() == newSyncStateSet.size() &&
this.syncStateSet.containsAll(newSyncStateSet)) {
+ return;
+ }
}
}
+ doReportSyncStateSetChanged(newSyncStateSet);
+ } catch (Exception e) {
+ LOGGER.error("Check syncStateSet error", e);
}
- doReportSyncStateSetChanged(newSyncStateSet);
}
private void doReportSyncStateSetChanged(Set<Long> newSyncStateSet) {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 6dc734e0c9..d5393fdca4 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -41,6 +41,7 @@ import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
+import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -73,7 +74,7 @@ public class AutoSwitchHAService extends DefaultHAService {
private EpochFileCache epochCache;
private AutoSwitchHAClient haClient;
- private Long brokerControllerId = null;
+ private Long localBrokerId = null;
public AutoSwitchHAService() {
}
@@ -287,9 +288,11 @@ public class AutoSwitchHAService extends DefaultHAService {
// If the slaveBrokerId is in syncStateSet but not in
connectionCaughtUpTimeTable,
// it means that the broker has not connected.
- for (Long slaveBrokerId : newSyncStateSet) {
- if (!this.connectionCaughtUpTimeTable.containsKey(slaveBrokerId)) {
- newSyncStateSet.remove(slaveBrokerId);
+ Iterator<Long> iterator = newSyncStateSet.iterator();
+ while (iterator.hasNext()) {
+ Long slaveBrokerId = iterator.next();
+ if (!Objects.equals(slaveBrokerId, this.localBrokerId) &&
!this.connectionCaughtUpTimeTable.containsKey(slaveBrokerId)) {
+ iterator.remove();
isSyncStateSetChanged = true;
}
}
@@ -419,7 +422,7 @@ public class AutoSwitchHAService extends DefaultHAService {
// To avoid the syncStateSet is not consistent with connectionList.
// Fix issue: https://github.com/apache/rocketmq/issues/6662
for (Long syncId : currentSyncStateSet) {
- if (!idList.contains(syncId) && this.brokerControllerId != null &&
!Objects.equals(syncId, this.brokerControllerId)) {
+ if (!idList.contains(syncId) && this.localBrokerId != null &&
!Objects.equals(syncId, this.localBrokerId)) {
LOGGER.warn("Slave {} is still in syncStateSet, but has lost
its connection. So new offset can't be compute.", syncId);
// Without check and re-compute, return the confirmOffset's
value directly.
return this.defaultMessageStore.getConfirmOffsetDirectly();
@@ -545,12 +548,12 @@ public class AutoSwitchHAService extends DefaultHAService
{
return this.epochCache.getAllEntries();
}
- public Long getBrokerControllerId() {
- return brokerControllerId;
+ public Long getLocalBrokerId() {
+ return localBrokerId;
}
- public void setBrokerControllerId(Long brokerControllerId) {
- this.brokerControllerId = brokerControllerId;
+ public void setLocalBrokerId(Long localBrokerId) {
+ this.localBrokerId = localBrokerId;
}
class AutoSwitchAcceptSocketService extends AcceptSocketService {