This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6fd0073d64 [ISSUE #7319] Optimize fault-tolerant mechanism for sending 
messages and hot update switch (#7320)
6fd0073d64 is described below

commit 6fd0073d6475c539e8f4c30dc4f104a56a21d724
Author: Ji Juntao <[email protected]>
AuthorDate: Thu Sep 7 20:21:16 2023 +0800

    [ISSUE #7319] Optimize fault-tolerant mechanism for sending messages and 
hot update switch (#7320)
---
 .../client/impl/producer/DefaultMQProducerImpl.java  |  8 ++------
 .../client/latency/LatencyFaultTolerance.java        | 14 ++++++++++++++
 .../client/latency/LatencyFaultToleranceImpl.java    | 13 ++++++++++++-
 .../rocketmq/client/latency/MQFaultStrategy.java     | 20 +++++++-------------
 .../proxy/service/route/MessageQueueView.java        |  9 ---------
 .../proxy/service/route/TopicRouteService.java       | 10 +++++++++-
 6 files changed, 44 insertions(+), 30 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 2d6b83ac2c..b0c212e46b 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -263,9 +263,7 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                     mQClientFactory.start();
                 }
 
-                if (this.mqFaultStrategy.isStartDetectorEnable()) {
-                    this.mqFaultStrategy.startDetector();
-                }
+                this.mqFaultStrategy.startDetector();
 
                 log.info("the producer [{}] start OK. 
sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                     this.defaultMQProducer.isSendMessageWithVIPChannel());
@@ -311,9 +309,7 @@ public class DefaultMQProducerImpl implements 
MQProducerInner {
                 if (shutdownFactory) {
                     this.mQClientFactory.shutdown();
                 }
-                if (this.mqFaultStrategy.isStartDetectorEnable()) {
-                    this.mqFaultStrategy.shutdown();
-                }
+                this.mqFaultStrategy.shutdown();
                 RequestFutureHolder.getInstance().shutdown(this);
                 log.info("the producer [{}] shutdown OK", 
this.defaultMQProducer.getProducerGroup());
                 this.serviceState = ServiceState.SHUTDOWN_ALREADY;
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
 
b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
index 72d2f34500..17aaa266aa 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultTolerance.java
@@ -89,4 +89,18 @@ public interface LatencyFaultTolerance<T> {
      * @param detectInterval each broker's detecting interval
      */
     void setDetectInterval(final int detectInterval);
+
+    /**
+     * Use it to set the detector work or not.
+     *
+     * @param startDetectorEnable set the detector's work status
+     */
+    void setStartDetectorEnable(final boolean startDetectorEnable);
+
+    /**
+     * Use it to judge if the detector enabled.
+     *
+     * @return is the detector should be started.
+     */
+    boolean isStartDetectorEnable();
 }
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
index 8af6295743..d3ff7eb45a 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/latency/LatencyFaultToleranceImpl.java
@@ -37,6 +37,8 @@ public class LatencyFaultToleranceImpl implements 
LatencyFaultTolerance<String>
     private int detectTimeout = 200;
     private int detectInterval = 2000;
     private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();
+
+    private volatile boolean startDetectorEnable = false;
     private final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
         @Override
         public Thread newThread(Runnable r) {
@@ -80,7 +82,9 @@ public class LatencyFaultToleranceImpl implements 
LatencyFaultTolerance<String>
             @Override
             public void run() {
                 try {
-                    detectByOneRound();
+                    if (startDetectorEnable) {
+                        detectByOneRound();
+                    }
                 } catch (Exception e) {
                     log.warn("Unexpected exception raised while detecting 
service reachability", e);
                 }
@@ -137,6 +141,13 @@ public class LatencyFaultToleranceImpl implements 
LatencyFaultTolerance<String>
         this.faultItemTable.remove(name);
     }
 
+    public boolean isStartDetectorEnable() {
+        return startDetectorEnable;
+    }
+
+    public void setStartDetectorEnable(boolean startDetectorEnable) {
+        this.startDetectorEnable = startDetectorEnable;
+    }
     @Override
     public String pickOneAtLeast() {
         final Enumeration<FaultItem> elements = this.faultItemTable.elements();
diff --git 
a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java 
b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
index c01490784f..69fb533e5a 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/latency/MQFaultStrategy.java
@@ -24,8 +24,8 @@ import org.apache.rocketmq.common.message.MessageQueue;
 
 public class MQFaultStrategy {
     private LatencyFaultTolerance<String> latencyFaultTolerance;
-    private boolean sendLatencyFaultEnable;
-    private boolean startDetectorEnable;
+    private volatile boolean sendLatencyFaultEnable;
+    private volatile boolean startDetectorEnable;
     private long[] latencyMax = {50L, 100L, 550L, 1800L, 3000L, 5000L, 15000L};
     private long[] notAvailableDuration = {0L, 0L, 2000L, 5000L, 6000L, 
10000L, 30000L};
 
@@ -64,11 +64,11 @@ public class MQFaultStrategy {
 
 
     public MQFaultStrategy(ClientConfig cc, Resolver fetcher, ServiceDetector 
serviceDetector) {
-        this.setStartDetectorEnable(cc.isStartDetectorEnable());
-        this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
         this.latencyFaultTolerance = new LatencyFaultToleranceImpl(fetcher, 
serviceDetector);
         this.latencyFaultTolerance.setDetectInterval(cc.getDetectInterval());
         this.latencyFaultTolerance.setDetectTimeout(cc.getDetectTimeout());
+        this.setStartDetectorEnable(cc.isStartDetectorEnable());
+        this.setSendLatencyFaultEnable(cc.isSendLatencyEnable());
     }
 
     // For unit test.
@@ -123,21 +123,15 @@ public class MQFaultStrategy {
 
     public void setStartDetectorEnable(boolean startDetectorEnable) {
         this.startDetectorEnable = startDetectorEnable;
+        this.latencyFaultTolerance.setStartDetectorEnable(startDetectorEnable);
     }
 
     public void startDetector() {
-        // user should start the detector
-        // and the thread should not be in running state.
-        if (this.sendLatencyFaultEnable && this.startDetectorEnable) {
-            // start the detector.
-            this.latencyFaultTolerance.startDetector();
-        }
+        this.latencyFaultTolerance.startDetector();
     }
 
     public void shutdown() {
-        if (this.sendLatencyFaultEnable && this.startDetectorEnable) {
-            this.latencyFaultTolerance.shutdown();
-        }
+        this.latencyFaultTolerance.shutdown();
     }
 
     public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, 
final String lastBrokerName, final boolean resetIndex) {
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
index 8b3c2f7c83..898e529f8c 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/MessageQueueView.java
@@ -26,7 +26,6 @@ public class MessageQueueView {
     private final MessageQueueSelector readSelector;
     private final MessageQueueSelector writeSelector;
     private final TopicRouteWrapper topicRouteWrapper;
-    private MQFaultStrategy mqFaultStrategy;
 
     public MessageQueueView(String topic, TopicRouteData topicRouteData, 
MQFaultStrategy mqFaultStrategy) {
         this.topicRouteWrapper = new TopicRouteWrapper(topicRouteData, topic);
@@ -67,12 +66,4 @@ public class MessageQueueView {
             .add("topicRouteWrapper", topicRouteWrapper)
             .toString();
     }
-
-    public MQFaultStrategy getMQFaultStrategy() {
-        return mqFaultStrategy;
-    }
-
-    public void setMQFaultStrategy(MQFaultStrategy mqFaultStrategy) {
-        this.mqFaultStrategy = mqFaultStrategy;
-    }
 }
diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
index 74769a4236..caf62a1e02 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/service/route/TopicRouteService.java
@@ -127,7 +127,7 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
             @Override
             public String resolve(String name) {
                 try {
-                    String brokerAddr = getBrokerAddr(null, name);
+                    String brokerAddr = 
getBrokerAddr(ProxyContext.createForInner("MQFaultStrategy"), name);
                     return brokerAddr;
                 } catch (Exception e) {
                     return null;
@@ -175,9 +175,17 @@ public abstract class TopicRouteService extends 
AbstractStartAndShutdown {
 
     public void updateFaultItem(final String brokerName, final long 
currentLatency, boolean isolation,
                                 boolean reachable) {
+        checkSendFaultToleranceEnable();
         this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, 
isolation, reachable);
     }
 
+    public void checkSendFaultToleranceEnable() {
+        boolean hotLatencySwitch = 
ConfigurationManager.getProxyConfig().isSendLatencyEnable();
+        boolean hotDetectorSwitch = 
ConfigurationManager.getProxyConfig().isStartDetectorEnable();
+        this.mqFaultStrategy.setSendLatencyFaultEnable(hotLatencySwitch);
+        this.mqFaultStrategy.setStartDetectorEnable(hotDetectorSwitch);
+    }
+
     public MQFaultStrategy getMqFaultStrategy() {
         return this.mqFaultStrategy;
     }

Reply via email to