This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7903c7041 [ISSUE #6662] Optimize the process of HA's confirmOffset
calculation (#6663)
7903c7041 is described below
commit 7903c70412fc4d20baafd7b75ca93788551fb88f
Author: Ji Juntao <[email protected]>
AuthorDate: Mon May 1 14:35:52 2023 +0800
[ISSUE #6662] Optimize the process of HA's confirmOffset calculation (#6663)
* When compute confirmOffset, judge whether the slaves in syncStateSet all
connect to the master.
* fix the brokerId to brokerControllerId.
* optimize the code: 1. Set brokerId when the replicasManager inited. 2.
Rename the confusing name.
* set haService's brokerControllerId after registered.
* set the haService's brokerControllerId when the ReplicasManager start
without brokerIdentity file.
---
.../broker/controller/ReplicasManager.java | 2 ++
.../store/ha/autoswitch/AutoSwitchHAService.java | 29 +++++++++++++++++++---
2 files changed, 28 insertions(+), 3 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index c39e33ad1..3c7e061a2 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -549,6 +549,7 @@ public class ReplicasManager {
this.brokerMetadata.updateAndPersist(brokerConfig.getBrokerClusterName(),
brokerConfig.getBrokerName(), tempBrokerMetadata.getBrokerId());
this.tempBrokerMetadata.clear();
this.brokerControllerId = this.brokerMetadata.getBrokerId();
+ this.haService.setBrokerControllerId(this.brokerControllerId);
return true;
} catch (Exception e) {
LOGGER.error("fail to create metadata file", e);
@@ -600,6 +601,7 @@ public class ReplicasManager {
if (this.brokerMetadata.isLoaded()) {
this.registerState = RegisterState.CREATE_METADATA_FILE_DONE;
this.brokerControllerId = brokerMetadata.getBrokerId();
+ this.haService.setBrokerControllerId(this.brokerControllerId);
return;
}
// 2. check if temp metadata exist
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 80249bc32..d1e623ca7 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -50,6 +51,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
/**
* SwitchAble ha service, support switch role to master or slave.
@@ -72,6 +74,8 @@ public class AutoSwitchHAService extends DefaultHAService {
private EpochFileCache epochCache;
private AutoSwitchHAClient haClient;
+ private Long brokerControllerId = null;
+
public AutoSwitchHAService() {
}
@@ -427,14 +431,25 @@ public class AutoSwitchHAService extends DefaultHAService
{
private long computeConfirmOffset() {
final Set<Long> currentSyncStateSet = getSyncStateSet();
- long confirmOffset = this.defaultMessageStore.getMaxPhyOffset();
+ long newConfirmOffset = this.defaultMessageStore.getMaxPhyOffset();
+ List<Long> idList = this.connectionList.stream().map(connection ->
((AutoSwitchHAConnection)connection).getSlaveId()).collect(Collectors.toList());
+
+ // To avoid the syncStateSet is not consistent with connectionList.
+ // Fix issue: https://github.com/apache/rocketmq/issues/6662
+ for (Long syncId : currentSyncStateSet) {
+ if (!idList.contains(syncId) && this.brokerControllerId != null &&
!Objects.equals(syncId, this.brokerControllerId)) {
+ LOGGER.warn("Slave {} is still in syncStateSet, but has lost
its connection. So new offset can't be compute.", syncId);
+ return this.confirmOffset;
+ }
+ }
+
for (HAConnection connection : this.connectionList) {
final Long slaveId = ((AutoSwitchHAConnection)
connection).getSlaveId();
if (currentSyncStateSet.contains(slaveId)) {
- confirmOffset = Math.min(confirmOffset,
connection.getSlaveAckOffset());
+ newConfirmOffset = Math.min(newConfirmOffset,
connection.getSlaveAckOffset());
}
}
- return confirmOffset;
+ return newConfirmOffset;
}
public void setSyncStateSet(final Set<Long> syncStateSet) {
@@ -545,6 +560,14 @@ public class AutoSwitchHAService extends DefaultHAService {
return this.epochCache.getAllEntries();
}
+ public Long getBrokerControllerId() {
+ return brokerControllerId;
+ }
+
+ public void setBrokerControllerId(Long brokerControllerId) {
+ this.brokerControllerId = brokerControllerId;
+ }
+
class AutoSwitchAcceptSocketService extends AcceptSocketService {
public AutoSwitchAcceptSocketService(final MessageStoreConfig
messageStoreConfig) {