Repository: incubator-rocketmq
Updated Branches:
  refs/heads/master 1562bd0d1 -> 6e31d864e


[ROCKETMQ-22] Resolve ClassCastException issue in printWaterMark.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6e31d864
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6e31d864
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6e31d864

Branch: refs/heads/master
Commit: 6e31d864e3f49b1296bad2e24955bee4d918d31d
Parents: 1562bd0
Author: yukon <[email protected]>
Authored: Mon Jan 9 21:58:38 2017 +0800
Committer: yukon <[email protected]>
Committed: Mon Jan 9 21:58:38 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/broker/BrokerController.java       |  2 +-
 .../broker/latency/BrokerFastFailure.java       |  4 +-
 .../longpolling/PullRequestHoldService.java     |  4 +-
 .../broker/processor/PullMessageProcessor.java  |  6 +-
 .../broker/api/BrokerFastFailureTest.java       | 61 ++++++++++++++++++++
 5 files changed, 69 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6e31d864/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
----------------------------------------------------------------------
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java 
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 9b89c85..af69001 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -463,7 +463,7 @@ public class BrokerController {
         final Runnable peek = q.peek();
         if (peek != null) {
             RequestTask rt = BrokerFastFailure.castRunnable(peek);
-            slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp();
+            slowTimeMills = rt == null ? 0 : this.messageStore.now() - 
rt.getCreateTimestamp();
         }
 
         if (slowTimeMills < 0)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6e31d864/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
----------------------------------------------------------------------
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
index d7d1276..a2a1aa0 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java
@@ -71,7 +71,7 @@ public class BrokerFastFailure {
                 } else {
                     break;
                 }
-            } catch (Throwable e) {
+            } catch (Throwable ignored) {
             }
         }
 
@@ -99,7 +99,7 @@ public class BrokerFastFailure {
                 } else {
                     break;
                 }
-            } catch (Throwable e) {
+            } catch (Throwable ignored) {
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6e31d864/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
----------------------------------------------------------------------
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
index ff068d2..fdba50d 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java
@@ -130,7 +130,7 @@ public class PullRequestHoldService extends ServiceThread {
                     if (newestOffset > request.getPullFromThisOffset()) {
                         if 
(this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) {
                             try {
-                                
this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
+                                
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                     request.getRequestCommand());
                             } catch (Throwable e) {
                                 log.error("execute request when wakeup 
failed.", e);
@@ -141,7 +141,7 @@ public class PullRequestHoldService extends ServiceThread {
 
                     if (System.currentTimeMillis() >= 
(request.getSuspendTimestamp() + request.getTimeoutMillis())) {
                         try {
-                            
this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
+                            
this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(),
                                 request.getRequestCommand());
                         } catch (Throwable e) {
                             log.error("execute request when wakeup failed.", 
e);

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6e31d864/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
----------------------------------------------------------------------
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 382030b..7d15894 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@ -50,6 +50,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.netty.RequestTask;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 import org.apache.rocketmq.store.GetMessageResult;
 import org.apache.rocketmq.store.MessageExtBrokerInner;
@@ -481,7 +482,7 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
         }
     }
 
-    public void excuteRequestWhenWakeup(final Channel channel, final 
RemotingCommand request) throws RemotingCommandException {
+    public void executeRequestWhenWakeup(final Channel channel, final 
RemotingCommand request) throws RemotingCommandException {
         Runnable run = new Runnable() {
             @Override
             public void run() {
@@ -513,8 +514,7 @@ public class PullMessageProcessor implements 
NettyRequestProcessor {
                 }
             }
         };
-
-        this.brokerController.getPullMessageExecutor().submit(run);
+        this.brokerController.getPullMessageExecutor().submit(new 
RequestTask(run, channel, request));
     }
 
     public void registerConsumeMessageHook(List<ConsumeMessageHook> 
sendMessageHookList) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6e31d864/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java
----------------------------------------------------------------------
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java
new file mode 100644
index 0000000..bec0af5
--- /dev/null
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.api;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.broker.BrokerTestHarness;
+import org.apache.rocketmq.broker.latency.BrokerFastFailure;
+import org.apache.rocketmq.broker.latency.FutureTaskExt;
+import org.apache.rocketmq.remoting.netty.RequestTask;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BrokerFastFailureTest extends BrokerTestHarness {
+
+    @Test
+    public void testHeadSlowTimeMills() throws InterruptedException {
+        BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>();
+        blockingQueue.add(new FutureTaskExt<>(new RequestTask(null, null, 
null), null));
+        TimeUnit.MILLISECONDS.sleep(10);
+        
Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) > 0);
+
+        blockingQueue.clear();
+        blockingQueue.add(new Runnable() {
+            @Override public void run() {
+
+            }
+        });
+        
Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) == 0);
+    }
+
+    @Test
+    public void testCastRunnable() {
+        Runnable runnable = new Runnable() {
+            @Override public void run() {
+
+            }
+        };
+        Assert.assertNull(BrokerFastFailure.castRunnable(runnable));
+
+        RequestTask requestTask = new RequestTask(null, null, null);
+        runnable = new FutureTaskExt<>(requestTask, null);
+
+        Assert.assertEquals(requestTask, 
BrokerFastFailure.castRunnable(runnable));
+    }
+}

Reply via email to