This is an automated email from the ASF dual-hosted git repository.

yukon pushed a commit to branch ROCKETMQ-311
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit c94fc4fa562ddc20332c3add01b95e291cd325a3
Author: yukon <[email protected]>
AuthorDate: Mon Nov 6 16:29:35 2017 +0800

    [ROCKETMQ-311] Add a swith for broker fast failure and support pull request 
queue
---
 .../rocketmq/broker/latency/BrokerFastFailure.java | 27 +++++++--
 .../broker/latency/BrokerFastFailureTest.java      | 65 ++++++++++++++++++++++
 .../org/apache/rocketmq/common/BrokerConfig.java   | 18 ++++++
 3 files changed, 104 insertions(+), 6 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index 2d4bedc..6aefe81 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.broker.latency;
 
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -27,6 +28,10 @@ import 
org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * BrokerFastFailure will cover {@link BrokerController#sendThreadPoolQueue} 
and
+ * {@link BrokerController#pullThreadPoolQueue}
+ */
 public class BrokerFastFailure {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     private final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
@@ -52,7 +57,9 @@ public class BrokerFastFailure {
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
             @Override
             public void run() {
-                cleanExpiredRequest();
+                if 
(brokerController.getBrokerConfig().isBrokerFastFailureEnable()) {
+                    cleanExpiredRequest();
+                }
             }
         }, 1000, 10, TimeUnit.MILLISECONDS);
     }
@@ -75,10 +82,18 @@ public class BrokerFastFailure {
             }
         }
 
+        
cleanExpiredRequestInQueue(this.brokerController.getSendThreadPoolQueue(),
+            
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue());
+
+        
cleanExpiredRequestInQueue(this.brokerController.getPullThreadPoolQueue(),
+            
this.brokerController.getBrokerConfig().getWaitTimeMillsInPullQueue());
+    }
+
+    void cleanExpiredRequestInQueue(final BlockingQueue<Runnable> 
blockingQueue, final long maxWaitTimeMillsInQueue) {
         while (true) {
             try {
-                if (!this.brokerController.getSendThreadPoolQueue().isEmpty()) 
{
-                    final Runnable runnable = 
this.brokerController.getSendThreadPoolQueue().peek();
+                if (!blockingQueue.isEmpty()) {
+                    final Runnable runnable = blockingQueue.peek();
                     if (null == runnable) {
                         break;
                     }
@@ -88,10 +103,10 @@ public class BrokerFastFailure {
                     }
 
                     final long behind = System.currentTimeMillis() - 
rt.getCreateTimestamp();
-                    if (behind >= 
this.brokerController.getBrokerConfig().getWaitTimeMillsInSendQueue()) {
-                        if 
(this.brokerController.getSendThreadPoolQueue().remove(runnable)) {
+                    if (behind >= maxWaitTimeMillsInQueue) {
+                        if (blockingQueue.remove(runnable)) {
                             rt.setStopRun(true);
-                            
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, 
String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a 
while, period in queue: %sms, size of queue: %d", behind, 
this.brokerController.getSendThreadPoolQueue().size()));
+                            
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, 
String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a 
while, period in queue: %sms, size of queue: %d", behind, 
blockingQueue.size()));
                         }
                     } else {
                         break;
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
new file mode 100644
index 0000000..5d0f7f9
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/latency/BrokerFastFailureTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.latency;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class BrokerFastFailureTest {
+    @Test
+    public void testCleanExpiredRequestInQueue() throws Exception {
+        BrokerFastFailure brokerFastFailure = new BrokerFastFailure(null);
+
+        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+        brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
+        assertThat(queue.size()).isZero();
+
+        //Normal Runnable
+        Runnable runnable = new Runnable() {
+            @Override
+            public void run() {
+
+            }
+        };
+        queue.add(runnable);
+
+        assertThat(queue.size()).isEqualTo(1);
+        brokerFastFailure.cleanExpiredRequestInQueue(queue, 1);
+        assertThat(queue.size()).isEqualTo(1);
+
+        queue.clear();
+
+        //With expired request
+        RequestTask expiredRequest = new RequestTask(runnable, null, null);
+        queue.add(new FutureTaskExt<>(expiredRequest, null));
+        TimeUnit.MILLISECONDS.sleep(100);
+
+        RequestTask requestTask = new RequestTask(runnable, null, null);
+        queue.add(new FutureTaskExt<>(requestTask, null));
+
+        assertThat(queue.size()).isEqualTo(2);
+        brokerFastFailure.cleanExpiredRequestInQueue(queue, 100);
+        assertThat(queue.size()).isEqualTo(1);
+        assertThat(((FutureTaskExt) 
queue.peek()).getRunnable()).isEqualTo(requestTask);
+    }
+
+}
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 9a208a3..a67fa74 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -103,7 +103,9 @@ public class BrokerConfig {
     private boolean disableConsumeIfConsumerReadSlowly = false;
     private long consumerFallbehindThreshold = 1024L * 1024 * 1024 * 16;
 
+    private boolean brokerFastFailureEnable = true;
     private long waitTimeMillsInSendQueue = 200;
+    private long waitTimeMillsInPullQueue = 5 * 1000;
 
     private long startAcceptSendRequestTimeStamp = 0L;
 
@@ -160,6 +162,22 @@ public class BrokerConfig {
         this.consumerFallbehindThreshold = consumerFallbehindThreshold;
     }
 
+    public boolean isBrokerFastFailureEnable() {
+        return brokerFastFailureEnable;
+    }
+
+    public void setBrokerFastFailureEnable(final boolean 
brokerFastFailureEnable) {
+        this.brokerFastFailureEnable = brokerFastFailureEnable;
+    }
+
+    public long getWaitTimeMillsInPullQueue() {
+        return waitTimeMillsInPullQueue;
+    }
+
+    public void setWaitTimeMillsInPullQueue(final long 
waitTimeMillsInPullQueue) {
+        this.waitTimeMillsInPullQueue = waitTimeMillsInPullQueue;
+    }
+
     public boolean isDisableConsumeIfConsumerReadSlowly() {
         return disableConsumeIfConsumerReadSlowly;
     }

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to