MASTER [ROCKETMQ-14] invoke callback shoule be invoked in an executor rather than in current thread, closes apache/incubator-rocketmq#2
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/1356e35f Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/1356e35f Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/1356e35f Branch: refs/heads/spec Commit: 1356e35f45ebfbca27930fe06e7abd659f111fb4 Parents: 0c022e0 Author: Jaskey <[email protected]> Authored: Tue Dec 27 17:26:05 2016 +0800 Committer: yukon <[email protected]> Committed: Tue Dec 27 17:26:05 2016 +0800 ---------------------------------------------------------------------- .../remoting/netty/NettyRemotingAbstract.java | 67 +++++++++++--------- .../rocketmq/remoting/NettyConnectionTest.java | 52 +++++++++++++++ 2 files changed, 88 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1356e35f/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java index 70ae5b5..1c3fdc5 100644 --- a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -198,35 +198,7 @@ public abstract class NettyRemotingAbstract { responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { - boolean runInThisThread = false; - ExecutorService executor = this.getCallbackExecutor(); - if (executor != null) { - try { - executor.submit(new Runnable() { - @Override - public void run() { - try { - responseFuture.executeInvokeCallback(); - } catch (Throwable e) { - PLOG.warn("execute callback in executor exception, and callback throw", e); - } - } - }); - } catch (Exception e) { - runInThisThread = true; - PLOG.warn("execute callback in executor exception, maybe executor busy", e); - } - } else { - runInThisThread = true; - } - - if (runInThisThread) { - try { - responseFuture.executeInvokeCallback(); - } catch (Throwable e) { - PLOG.warn("executeInvokeCallback Exception", e); - } - } + executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); } @@ -236,6 +208,39 @@ public abstract class NettyRemotingAbstract { } } + //execute callback in callback executor. If callback executor is null, run directly in current thread + private void executeInvokeCallback(final ResponseFuture responseFuture) { + boolean runInThisThread = false; + ExecutorService executor = this.getCallbackExecutor(); + if (executor != null) { + try { + executor.submit(new Runnable() { + @Override + public void run() { + try { + responseFuture.executeInvokeCallback(); + } catch (Throwable e) { + PLOG.warn("execute callback in executor exception, and callback throw", e); + } + } + }); + } catch (Exception e) { + runInThisThread = true; + PLOG.warn("execute callback in executor exception, maybe executor busy", e); + } + } else { + runInThisThread = true; + } + + if (runInThisThread) { + try { + responseFuture.executeInvokeCallback(); + } catch (Throwable e) { + PLOG.warn("executeInvokeCallback Exception", e); + } + } + } + public abstract RPCHook getRPCHook(); abstract public ExecutorService getCallbackExecutor(); @@ -257,7 +262,7 @@ public abstract class NettyRemotingAbstract { for (ResponseFuture rf : rfList) { try { - rf.executeInvokeCallback(); + executeInvokeCallback(rf); } catch (Throwable e) { PLOG.warn("scanResponseTable, operationComplete Exception", e); } @@ -329,7 +334,7 @@ public abstract class NettyRemotingAbstract { responseFuture.putResponse(null); responseTable.remove(opaque); try { - responseFuture.executeInvokeCallback(); + executeInvokeCallback(responseFuture); } catch (Throwable e) { PLOG.warn("excute callback in writeAndFlush addListener, and callback throw", e); } finally { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1356e35f/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java ---------------------------------------------------------------------- diff --git a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java index e4ff948..755d332 100644 --- a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java +++ b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java @@ -22,9 +22,15 @@ import com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException; import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException; import com.alibaba.rocketmq.remoting.netty.NettyClientConfig; import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient; +import com.alibaba.rocketmq.remoting.netty.ResponseFuture; import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + /** @@ -51,6 +57,52 @@ public class NettyConnectionTest { System.out.println("-----------------------------------------------------------------"); } + + @Test + public void test_async_timeout() throws InterruptedException, RemotingConnectException, + RemotingSendRequestException, RemotingTimeoutException { + RemotingClient client = createRemotingClient(); + final AtomicInteger ai = new AtomicInteger(0); + final CountDownLatch latch = new CountDownLatch(100); + for(int i=0;i<100;i++) { + try { + RemotingCommand request = RemotingCommand.createRequestCommand(0, null); + client.invokeAsync("localhost:8888", request, 5, new InvokeCallback() {//very easy to timeout + @Override + public void operationComplete(ResponseFuture responseFuture) { + if (responseFuture.isTimeout()) { + if(ai.getAndIncrement()==4) { + try { + System.out.println("First try timeout, blocking 10s" + Thread.currentThread().getName()); + Thread.sleep(10 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + else{ + System.out.println("Timeout callback execute,very short."+Thread.currentThread().getName()); + } + } + else{ + System.out.println("Success."+Thread.currentThread().getName()); + } + latch.countDown(); + + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + } + + + + latch.await(1000, TimeUnit.MILLISECONDS); + Assert.assertEquals(1, latch.getCount());//only one should be blocked + client.shutdown(); + System.out.println("-----------------------------------------------------------------"); + } + public static RemotingClient createRemotingClient() { NettyClientConfig config = new NettyClientConfig(); config.setClientChannelMaxIdleTimeSeconds(15);
