This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 6fd0073d64 [ISSUE #7319] Optimize fault-tolerant mechanism for sending
messages and hot update switch (#7320)
6fd0073d64 is described below
commit 6fd0073d6475c539e8f4c30dc4f104a56a21d724
Author: Ji Juntao <[email protected]>
AuthorDate: Thu Sep 7 20:21:16 2023 +0800
[ISSUE #7319] Optimize fault-tolerant mechanism for sending messages and
hot update switch (#7320)
---
.../client/impl/producer/DefaultMQProducerImpl.java | 8 ++------
.../client/latency/LatencyFaultTolerance.java | 14 ++++++++++++++
.../client/latency/LatencyFaultToleranceImpl.java | 13 ++++++++++++-
.../rocketmq/client/latency/MQFaultStrategy.java | 20 +++++++-------------
.../proxy/service/route/MessageQueueView.java | 9 ---------
.../proxy/service/route/TopicRouteService.java | 10 +++++++++-
6 files changed, 44 insertions(+), 30 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 2d6b83ac2c..b0c212e46b 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -263,9 +263,7 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
mQClientFactory.start();
}
- if (this.mqFaultStrategy.isStartDetectorEnable()) {
- this.mqFaultStrategy.startDetector();
- }
+ this.mqFaultStrategy.startDetector();
log.info("the producer [{}] start OK.
sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
this.defaultMQProducer.isSendMessageWithVIPChannel());
@@ -311,9 +309,7 @@ public class DefaultMQProducerImpl implements
MQProducerInner {
if (shutdownFactory) {
this.mQClientFactory.shutdown();
}
- if (this.mqFaultStrategy.isStartDetectorEnable()) {
- this.mqFaultStrategy.shutdown();
- }
+ this.mqFaultStrategy.shutdown();
RequestFutureHolder.getInstance().shutdown(this);
log.info("the producer [{}] shutdown OK",
this.defaultMQProducer.getProducerGroup());
this.serviceState = ServiceState.SHUTDOWN_ALREADY;
diff --git
a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
index 72d2f34500..17aaa266aa 100644
---
a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
+++
b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
@@ -89,4 +89,18 @@ public interface LatencyFaultTolerance<T> {
* @param detectInterval each broker's detecting interval
*/
void setDetectInterval(final int detectInterval);
+
+ /**
+ * Use it to set the detector work or not.
+ *
+ * @param startDetectorEnable set the detector's work status
+ */
+ void setStartDetectorEnable(final boolean startDetectorEnable);
+
+ /**
+ * Use it to judge if the detector enabled.
+ *
+ * @return is the detector should be started.
+ */
+ boolean isStartDetectorEnable();
}
diff --git
a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
index 8af6295743..d3ff7eb45a 100644
---
a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
@@ -37,6 +37,8 @@ public class LatencyFaultToleranceImpl implements
LatencyFaultTolerance<String>
private int detectTimeout = 200;
private int detectInterval = 2000;
private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
+
+ private volatile boolean startDetectorEnable = false;
private final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
@@ -80,7 +82,9 @@ public class LatencyFaultToleranceImpl implements
LatencyFaultTolerance<String>
@Override
public void run() {
try {
- detectByOneRound();
+ if (startDetectorEnable) {
+ detectByOneRound();
+ }
} catch (Exception e) {
log.warn("Unexpected exception raised while detecting
service reachability", e);
}
@@ -137,6 +141,13 @@ public class LatencyFaultToleranceImpl implements
LatencyFaultTolerance<String>
this.faultItemTable.remove(name);
}
+ public boolean isStartDetectorEnable() {
+ return startDetectorEnable;
+ }
+
+ public void setStartDetectorEnable(boolean startDetectorEnable) {
+ this.startDetectorEnable = startDetectorEnable;
+ }
@Override
public String pickOneAtLeast() {
final Enumeration<FaultItem> elements = this.faultItemTable.elements();
diff --git
a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
index c01490784f..69fb533e5a 100644
---
a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+++
b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
@@ -24,8 +24,8 @@ import org.apache.rocketmq.common.message.MessageQueue;
public class MQFaultStrategy {
private LatencyFaultTolerance<String> latencyFaultTolerance;
- private boolean sendLatencyFaultEnable;
- private boolean startDetectorEnable;
+ private volatile boolean sendLatencyFaultEnable;
+ private volatile boolean startDetectorEnable;
private long[] latencyMax = {50L, 100L, 550L, 1800L, 3000L, 5000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 2000L, 5000L, 6000L,
10000L, 30000L};
@@ -64,11 +64,11 @@ public class MQFaultStrategy {
public MQFaultStrategy(ClientConfig cc, Resolver fetcher, ServiceDetector
serviceDetector) {
- this.setStartDetectorEnable(cc.isStartDetectorEnable());
- this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
this.latencyFaultTolerance = new LatencyFaultToleranceImpl(fetcher,
serviceDetector);
this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval());
this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout());
+ this.setStartDetectorEnable(cc.isStartDetectorEnable());
+ this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
}
// For unit test.
@@ -123,21 +123,15 @@ public class MQFaultStrategy {
public void setStartDetectorEnable(boolean startDetectorEnable) {
this.startDetectorEnable = startDetectorEnable;
+ this.latencyFaultTolerance.setStartDetectorEnable(startDetectorEnable);
}
public void startDetector() {
- // user should start the detector
- // and the thread should not be in running state.
- if (this.sendLatencyFaultEnable && this.startDetectorEnable) {
- // start the detector.
- this.latencyFaultTolerance.startDetector();
- }
+ this.latencyFaultTolerance.startDetector();
}
public void shutdown() {
- if (this.sendLatencyFaultEnable && this.startDetectorEnable) {
- this.latencyFaultTolerance.shutdown();
- }
+ this.latencyFaultTolerance.shutdown();
}
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo,
final String lastBrokerName, final boolean resetIndex) {
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
index 8b3c2f7c83..898e529f8c 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
@@ -26,7 +26,6 @@ public class MessageQueueView {
private final MessageQueueSelector readSelector;
private final MessageQueueSelector writeSelector;
private final TopicRouteWrapper topicRouteWrapper;
- private MQFaultStrategy mqFaultStrategy;
public MessageQueueView(String topic, TopicRouteData topicRouteData,
MQFaultStrategy mqFaultStrategy) {
this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic);
@@ -67,12 +66,4 @@ public class MessageQueueView {
.add("topicRouteWrapper", topicRouteWrapper)
.toString();
}
-
- public MQFaultStrategy getMQFaultStrategy() {
- return mqFaultStrategy;
- }
-
- public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) {
- this.mqFaultStrategy = mqFaultStrategy;
- }
}
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index 74769a4236..caf62a1e02 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -127,7 +127,7 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
@Override
public String resolve(String name) {
try {
- String brokerAddr = getBrokerAddr(null, name);
+ String brokerAddr =
getBrokerAddr(ProxyContext.createForInner("MQFaultStrategy"), name);
return brokerAddr;
} catch (Exception e) {
return null;
@@ -175,9 +175,17 @@ public abstract class TopicRouteService extends
AbstractStartAndShutdown {
public void updateFaultItem(final String brokerName, final long
currentLatency, boolean isolation,
boolean reachable) {
+ checkSendFaultToleranceEnable();
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency,
isolation, reachable);
}
+ public void checkSendFaultToleranceEnable() {
+ boolean hotLatencySwitch =
ConfigurationManager.getProxyConfig().isSendLatencyEnable();
+ boolean hotDetectorSwitch =
ConfigurationManager.getProxyConfig().isStartDetectorEnable();
+ this.mqFaultStrategy.setSendLatencyFaultEnable(hotLatencySwitch);
+ this.mqFaultStrategy.setStartDetectorEnable(hotDetectorSwitch);
+ }
+
public MQFaultStrategy getMqFaultStrategy() {
return this.mqFaultStrategy;
}