This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d5d5a637f [ISSUE #6226] Shutdown flowMonitor when connection
disconnect (#6227)
d5d5a637f is described below
commit d5d5a637f6c4f2eeb9430e068c8d620b4010c16d
Author: rongtong <[email protected]>
AuthorDate: Fri Mar 3 22:28:15 2023 +0800
[ISSUE #6226] Shutdown flowMonitor when connection disconnect (#6227)
* Shutdown flowMonitor when connection disconnect
* Whether the master is existed has more accurate judgment
* Shutdown flowMonitor when HAClient service end
* Pass the check style
* Modify according to comments
---
.../org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java | 3 ++-
.../src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java | 2 ++
.../main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java | 4 ++++
.../org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java | 3 +++
.../apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java | 5 +++++
5 files changed, 16 insertions(+), 1 deletion(-)
diff --git
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
index 997dee3c5..2674dabf5 100644
---
a/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
+++
b/controller/src/main/java/org/apache/rocketmq/controller/impl/manager/SyncStateInfo.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.controller.impl.manager;
import java.util.HashSet;
import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
/**
* Manages the syncStateSet of broker replicas.
@@ -53,7 +54,7 @@ public class SyncStateInfo {
}
public boolean isMasterExist() {
- return !this.masterAddress.isEmpty();
+ return !StringUtils.isBlank(masterAddress);
}
public String getClusterName() {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java
b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java
index 06878c185..530d295ae 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAClient.java
@@ -309,6 +309,7 @@ public class DefaultHAClient extends ServiceThread
implements HAClient {
try {
switch (this.currentState) {
case SHUTDOWN:
+ this.flowMonitor.shutdown(true);
return;
case READY:
if (!this.connectMaster()) {
@@ -339,6 +340,7 @@ public class DefaultHAClient extends ServiceThread
implements HAClient {
}
}
+ this.flowMonitor.shutdown(true);
log.info(this.getServiceName() + " service end");
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
index e05e0ce23..5dd24410e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java
@@ -195,6 +195,8 @@ public class DefaultHAConnection implements HAConnection {
log.error("", e);
}
+ flowMonitor.shutdown(true);
+
log.info(this.getServiceName() + " service end");
}
@@ -398,6 +400,8 @@ public class DefaultHAConnection implements HAConnection {
DefaultHAConnection.log.error("", e);
}
+ flowMonitor.shutdown(true);
+
DefaultHAConnection.log.info(this.getServiceName() + " service
end");
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
index 554c64fd1..b95d3814a 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAClient.java
@@ -398,6 +398,7 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
try {
switch (this.currentState) {
case SHUTDOWN:
+ this.flowMonitor.shutdown(true);
return;
case READY:
// Truncate invalid msg first
@@ -437,6 +438,8 @@ public class AutoSwitchHAClient extends ServiceThread
implements HAClient {
}
}
+ this.flowMonitor.shutdown(true);
+ LOGGER.info(this.getServiceName() + " service end");
}
/**
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 7401574e5..57f9e9619 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -294,6 +294,8 @@ public class AutoSwitchHAConnection implements HAConnection
{
AutoSwitchHAConnection.LOGGER.error("", e);
}
+ flowMonitor.shutdown(true);
+
AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + "
service end");
}
@@ -739,6 +741,9 @@ public class AutoSwitchHAConnection implements HAConnection
{
} catch (IOException e) {
AutoSwitchHAConnection.LOGGER.error("", e);
}
+
+ flowMonitor.shutdown(true);
+
AutoSwitchHAConnection.LOGGER.info(this.getServiceName() + "
service end");
}