This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f1e02e894 [ISSUE #6392] GetSyncStateSet prints replica's alive status
(#6393)
f1e02e894 is described below
commit f1e02e8947b6a5ac692b1c07d9baf8a6f3a394b0
Author: Ji Juntao <[email protected]>
AuthorDate: Mon Mar 20 13:53:07 2023 +0800
[ISSUE #6392] GetSyncStateSet prints replica's alive status (#6393)
* print replica's alive status
* refactor
* rename getIsAlive-getAlive and setIsAlive-setAlive.
---
.../rocketmq/controller/impl/DLedgerController.java | 2 +-
.../controller/impl/manager/ReplicasInfoManager.java | 16 ++++++++++++----
.../controller/impl/manager/ReplicasInfoManagerTest.java | 2 +-
.../remoting/protocol/body/BrokerReplicasInfo.java | 14 +++++++++++---
4 files changed, 25 insertions(+), 9 deletions(-)
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index 418e3902c..491cb16d1 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -190,7 +190,7 @@ public class DLedgerController implements Controller {
@Override
public CompletableFuture<RemotingCommand> getSyncStateData(List<String>
brokerNames) {
return this.scheduler.appendEvent("getSyncStateData",
- () -> this.replicasInfoManager.getSyncStateData(brokerNames),
false);
+ () -> this.replicasInfoManager.getSyncStateData(brokerNames,
brokerAlivePredicate), false);
}
@Override
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
index 103cb68e2..2f5c3307c 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManager.java
@@ -353,7 +353,7 @@ public class ReplicasInfoManager {
return result;
}
- public ControllerResult<Void> getSyncStateData(final List<String>
brokerNames) {
+ public ControllerResult<Void> getSyncStateData(final List<String>
brokerNames, final BrokerValidPredicate brokerAlivePredicate) {
final ControllerResult<Void> result = new ControllerResult<>();
final BrokerReplicasInfo brokerReplicasInfo = new BrokerReplicasInfo();
for (String brokerName : brokerNames) {
@@ -366,15 +366,23 @@ public class ReplicasInfoManager {
final ArrayList<BrokerReplicasInfo.ReplicaIdentity>
inSyncReplicas = new ArrayList<>();
final ArrayList<BrokerReplicasInfo.ReplicaIdentity>
notInSyncReplicas = new ArrayList<>();
+ if (brokerReplicaInfo == null) {
+ continue;
+ }
+
brokerReplicaInfo.getBrokerIdTable().forEach((brokerId,
brokerAddress) -> {
+ Boolean isAlive =
brokerAlivePredicate.check(brokerReplicaInfo.getClusterName(), brokerName,
brokerId);
+ BrokerReplicasInfo.ReplicaIdentity replica = new
BrokerReplicasInfo.ReplicaIdentity(brokerName, brokerId, brokerAddress);
+ replica.setAlive(isAlive);
if (syncStateSet.contains(brokerId)) {
- inSyncReplicas.add(new
BrokerReplicasInfo.ReplicaIdentity(brokerName, brokerId, brokerAddress));
+ inSyncReplicas.add(replica);
} else {
- notInSyncReplicas.add(new
BrokerReplicasInfo.ReplicaIdentity(brokerName, brokerId, brokerAddress));
+ notInSyncReplicas.add(replica);
}
});
- final BrokerReplicasInfo.ReplicasInfo inSyncState = new
BrokerReplicasInfo.ReplicasInfo(masterBrokerId,
brokerReplicaInfo.getBrokerAddress(masterBrokerId),
syncStateInfo.getMasterEpoch(), syncStateInfo.getSyncStateSetEpoch(),
inSyncReplicas, notInSyncReplicas);
+ final BrokerReplicasInfo.ReplicasInfo inSyncState = new
BrokerReplicasInfo.ReplicasInfo(masterBrokerId,
brokerReplicaInfo.getBrokerAddress(masterBrokerId),
syncStateInfo.getMasterEpoch(), syncStateInfo.getSyncStateSetEpoch(),
+ inSyncReplicas, notInSyncReplicas);
brokerReplicasInfo.addReplicaInfo(brokerName, inSyncState);
}
}
diff --git
a/controller/src/test/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManagerTest.java
b/controller/src/test/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManagerTest.java
index 19411e778..e2c32b03b 100644
---
a/controller/src/test/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManagerTest.java
+++
b/controller/src/test/java/org/apache/rocketmq/controller/impl/manager/ReplicasInfoManagerTest.java
@@ -87,7 +87,7 @@ public class ReplicasInfoManagerTest {
}
private BrokerReplicasInfo.ReplicasInfo getReplicasInfo(String brokerName)
{
- ControllerResult<Void> syncStateData =
this.replicasInfoManager.getSyncStateData(Arrays.asList(brokerName));
+ ControllerResult<Void> syncStateData =
this.replicasInfoManager.getSyncStateData(Arrays.asList(brokerName), (a, b, c)
-> true);
BrokerReplicasInfo replicasInfo =
RemotingSerializable.decode(syncStateData.getBody(), BrokerReplicasInfo.class);
return replicasInfo.getReplicasInfoTable().get(brokerName);
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
index f7ceb82d7..f912e4e8e 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/body/BrokerReplicasInfo.java
@@ -54,8 +54,7 @@ public class BrokerReplicasInfo extends RemotingSerializable
{
private List<ReplicaIdentity> notInSyncReplicas;
public ReplicasInfo(Long masterBrokerId, String masterAddress, int
masterEpoch, int syncStateSetEpoch,
- List<ReplicaIdentity> inSyncReplicas,
- List<ReplicaIdentity> notInSyncReplicas) {
+ List<ReplicaIdentity> inSyncReplicas,
List<ReplicaIdentity> notInSyncReplicas) {
this.masterBrokerId = masterBrokerId;
this.masterAddress = masterAddress;
this.masterEpoch = masterEpoch;
@@ -125,7 +124,6 @@ public class BrokerReplicasInfo extends
RemotingSerializable {
public boolean isExistInAllReplicas(String brokerName, Long brokerId,
String brokerAddress) {
return this.isExistInSync(brokerName, brokerId, brokerAddress) ||
this.isExistInNotSync(brokerName, brokerId, brokerAddress);
}
-
}
public static class ReplicaIdentity extends RemotingSerializable {
@@ -133,6 +131,7 @@ public class BrokerReplicasInfo extends
RemotingSerializable {
private Long brokerId;
private String brokerAddress;
+ private Boolean isAlive = false;
public ReplicaIdentity(String brokerName, Long brokerId, String
brokerAddress) {
this.brokerName = brokerName;
@@ -164,12 +163,21 @@ public class BrokerReplicasInfo extends
RemotingSerializable {
this.brokerId = brokerId;
}
+ public Boolean getAlive() {
+ return isAlive;
+ }
+
+ public void setAlive(Boolean alive) {
+ isAlive = alive;
+ }
+
@Override
public String toString() {
return "ReplicaIdentity{" +
"brokerName='" + brokerName + '\'' +
", brokerId=" + brokerId +
", brokerAddress='" + brokerAddress + '\'' +
+ ", isAlive=" + isAlive +
'}';
}