Repository: incubator-rocketmq Updated Branches: refs/heads/master 1562bd0d1 -> 6e31d864e
[ROCKETMQ-22] Resolve ClassCastException issue in printWaterMark. Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/6e31d864 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/6e31d864 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/6e31d864 Branch: refs/heads/master Commit: 6e31d864e3f49b1296bad2e24955bee4d918d31d Parents: 1562bd0 Author: yukon <[email protected]> Authored: Mon Jan 9 21:58:38 2017 +0800 Committer: yukon <[email protected]> Committed: Mon Jan 9 21:58:38 2017 +0800 ---------------------------------------------------------------------- .../rocketmq/broker/BrokerController.java | 2 +- .../broker/latency/BrokerFastFailure.java | 4 +- .../longpolling/PullRequestHoldService.java | 4 +- .../broker/processor/PullMessageProcessor.java | 6 +- .../broker/api/BrokerFastFailureTest.java | 61 ++++++++++++++++++++ 5 files changed, 69 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6e31d864/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 9b89c85..af69001 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -463,7 +463,7 @@ public class BrokerController { final Runnable peek = q.peek(); if (peek != null) { RequestTask rt = BrokerFastFailure.castRunnable(peek); - slowTimeMills = this.messageStore.now() - rt.getCreateTimestamp(); + slowTimeMills = rt == null ? 0 : this.messageStore.now() - rt.getCreateTimestamp(); } if (slowTimeMills < 0) http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6e31d864/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java index d7d1276..a2a1aa0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/latency/BrokerFastFailure.java @@ -71,7 +71,7 @@ public class BrokerFastFailure { } else { break; } - } catch (Throwable e) { + } catch (Throwable ignored) { } } @@ -99,7 +99,7 @@ public class BrokerFastFailure { } else { break; } - } catch (Throwable e) { + } catch (Throwable ignored) { } } } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6e31d864/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java index ff068d2..fdba50d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java @@ -130,7 +130,7 @@ public class PullRequestHoldService extends ServiceThread { if (newestOffset > request.getPullFromThisOffset()) { if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tagsCode)) { try { - this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), + this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); @@ -141,7 +141,7 @@ public class PullRequestHoldService extends ServiceThread { if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) { try { - this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(), + this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(request.getClientChannel(), request.getRequestCommand()); } catch (Throwable e) { log.error("execute request when wakeup failed.", e); http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6e31d864/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java ---------------------------------------------------------------------- diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java index 382030b..7d15894 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java @@ -50,6 +50,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.exception.RemotingCommandException; import org.apache.rocketmq.remoting.netty.NettyRequestProcessor; +import org.apache.rocketmq.remoting.netty.RequestTask; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.store.GetMessageResult; import org.apache.rocketmq.store.MessageExtBrokerInner; @@ -481,7 +482,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { } } - public void excuteRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { + public void executeRequestWhenWakeup(final Channel channel, final RemotingCommand request) throws RemotingCommandException { Runnable run = new Runnable() { @Override public void run() { @@ -513,8 +514,7 @@ public class PullMessageProcessor implements NettyRequestProcessor { } } }; - - this.brokerController.getPullMessageExecutor().submit(run); + this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, channel, request)); } public void registerConsumeMessageHook(List<ConsumeMessageHook> sendMessageHookList) { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/6e31d864/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java ---------------------------------------------------------------------- diff --git a/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java b/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java new file mode 100644 index 0000000..bec0af5 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/api/BrokerFastFailureTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.api; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.broker.BrokerTestHarness; +import org.apache.rocketmq.broker.latency.BrokerFastFailure; +import org.apache.rocketmq.broker.latency.FutureTaskExt; +import org.apache.rocketmq.remoting.netty.RequestTask; +import org.junit.Assert; +import org.junit.Test; + +public class BrokerFastFailureTest extends BrokerTestHarness { + + @Test + public void testHeadSlowTimeMills() throws InterruptedException { + BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>(); + blockingQueue.add(new FutureTaskExt<>(new RequestTask(null, null, null), null)); + TimeUnit.MILLISECONDS.sleep(10); + Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) > 0); + + blockingQueue.clear(); + blockingQueue.add(new Runnable() { + @Override public void run() { + + } + }); + Assert.assertTrue(this.brokerController.headSlowTimeMills(blockingQueue) == 0); + } + + @Test + public void testCastRunnable() { + Runnable runnable = new Runnable() { + @Override public void run() { + + } + }; + Assert.assertNull(BrokerFastFailure.castRunnable(runnable)); + + RequestTask requestTask = new RequestTask(null, null, null); + runnable = new FutureTaskExt<>(requestTask, null); + + Assert.assertEquals(requestTask, BrokerFastFailure.castRunnable(runnable)); + } +}
