MASTER [ROCKETMQ-14] invoke callback shoule be invoked in an executor rather 
than in current thread, closes apache/incubator-rocketmq#2


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/1356e35f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/1356e35f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/1356e35f

Branch: refs/heads/master
Commit: 1356e35f45ebfbca27930fe06e7abd659f111fb4
Parents: 0c022e0
Author: Jaskey <[email protected]>
Authored: Tue Dec 27 17:26:05 2016 +0800
Committer: yukon <[email protected]>
Committed: Tue Dec 27 17:26:05 2016 +0800

----------------------------------------------------------------------
 .../remoting/netty/NettyRemotingAbstract.java   | 67 +++++++++++---------
 .../rocketmq/remoting/NettyConnectionTest.java  | 52 +++++++++++++++
 2 files changed, 88 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1356e35f/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
 
b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 70ae5b5..1c3fdc5 100644
--- 
a/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ 
b/rocketmq-remoting/src/main/java/com/alibaba/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -198,35 +198,7 @@ public abstract class NettyRemotingAbstract {
             responseTable.remove(opaque);
 
             if (responseFuture.getInvokeCallback() != null) {
-                boolean runInThisThread = false;
-                ExecutorService executor = this.getCallbackExecutor();
-                if (executor != null) {
-                    try {
-                        executor.submit(new Runnable() {
-                            @Override
-                            public void run() {
-                                try {
-                                    responseFuture.executeInvokeCallback();
-                                } catch (Throwable e) {
-                                    PLOG.warn("execute callback in executor 
exception, and callback throw", e);
-                                }
-                            }
-                        });
-                    } catch (Exception e) {
-                        runInThisThread = true;
-                        PLOG.warn("execute callback in executor exception, 
maybe executor busy", e);
-                    }
-                } else {
-                    runInThisThread = true;
-                }
-
-                if (runInThisThread) {
-                    try {
-                        responseFuture.executeInvokeCallback();
-                    } catch (Throwable e) {
-                        PLOG.warn("executeInvokeCallback Exception", e);
-                    }
-                }
+                executeInvokeCallback(responseFuture);
             } else {
                 responseFuture.putResponse(cmd);
             }
@@ -236,6 +208,39 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
+    //execute callback in callback executor. If callback executor is null, run 
directly in current thread
+    private void executeInvokeCallback(final ResponseFuture responseFuture) {
+        boolean runInThisThread = false;
+        ExecutorService executor = this.getCallbackExecutor();
+        if (executor != null) {
+            try {
+                executor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            responseFuture.executeInvokeCallback();
+                        } catch (Throwable e) {
+                            PLOG.warn("execute callback in executor exception, 
and callback throw", e);
+                        }
+                    }
+                });
+            } catch (Exception e) {
+                runInThisThread = true;
+                PLOG.warn("execute callback in executor exception, maybe 
executor busy", e);
+            }
+        } else {
+            runInThisThread = true;
+        }
+
+        if (runInThisThread) {
+            try {
+                responseFuture.executeInvokeCallback();
+            } catch (Throwable e) {
+                PLOG.warn("executeInvokeCallback Exception", e);
+            }
+        }
+    }
+
     public abstract RPCHook getRPCHook();
 
     abstract public ExecutorService getCallbackExecutor();
@@ -257,7 +262,7 @@ public abstract class NettyRemotingAbstract {
 
         for (ResponseFuture rf : rfList) {
             try {
-                rf.executeInvokeCallback();
+                executeInvokeCallback(rf);
             } catch (Throwable e) {
                 PLOG.warn("scanResponseTable, operationComplete Exception", e);
             }
@@ -329,7 +334,7 @@ public abstract class NettyRemotingAbstract {
                         responseFuture.putResponse(null);
                         responseTable.remove(opaque);
                         try {
-                            responseFuture.executeInvokeCallback();
+                            executeInvokeCallback(responseFuture);
                         } catch (Throwable e) {
                             PLOG.warn("excute callback in writeAndFlush 
addListener, and callback throw", e);
                         } finally {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/1356e35f/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
 
b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
index e4ff948..755d332 100644
--- 
a/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
+++ 
b/rocketmq-remoting/src/test/java/com/alibaba/rocketmq/remoting/NettyConnectionTest.java
@@ -22,9 +22,15 @@ import 
com.alibaba.rocketmq.remoting.exception.RemotingSendRequestException;
 import com.alibaba.rocketmq.remoting.exception.RemotingTimeoutException;
 import com.alibaba.rocketmq.remoting.netty.NettyClientConfig;
 import com.alibaba.rocketmq.remoting.netty.NettyRemotingClient;
+import com.alibaba.rocketmq.remoting.netty.ResponseFuture;
 import com.alibaba.rocketmq.remoting.protocol.RemotingCommand;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 
 /**
 
@@ -51,6 +57,52 @@ public class NettyConnectionTest {
         
System.out.println("-----------------------------------------------------------------");
     }
 
+
+    @Test
+    public void test_async_timeout() throws InterruptedException, 
RemotingConnectException,
+            RemotingSendRequestException, RemotingTimeoutException {
+        RemotingClient client = createRemotingClient();
+        final AtomicInteger ai = new AtomicInteger(0);
+        final CountDownLatch latch = new CountDownLatch(100);
+        for(int i=0;i<100;i++) {
+            try {
+                RemotingCommand request = 
RemotingCommand.createRequestCommand(0, null);
+                client.invokeAsync("localhost:8888", request, 5, new 
InvokeCallback() {//very easy to timeout
+                    @Override
+                    public void operationComplete(ResponseFuture 
responseFuture) {
+                        if (responseFuture.isTimeout()) {
+                            if(ai.getAndIncrement()==4) {
+                                try {
+                                    System.out.println("First try timeout,  
blocking 10s" + Thread.currentThread().getName());
+                                    Thread.sleep(10 * 1000);
+                                } catch (InterruptedException e) {
+                                    e.printStackTrace();
+                                }
+                            }
+                            else{
+                                System.out.println("Timeout callback 
execute,very short."+Thread.currentThread().getName());
+                            }
+                        }
+                        else{
+                            
System.out.println("Success."+Thread.currentThread().getName());
+                        }
+                        latch.countDown();
+
+                    }
+                });
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+
+
+        latch.await(1000, TimeUnit.MILLISECONDS);
+        Assert.assertEquals(1, latch.getCount());//only one should be blocked
+        client.shutdown();
+        
System.out.println("-----------------------------------------------------------------");
+    }
+
     public static RemotingClient createRemotingClient() {
         NettyClientConfig config = new NettyClientConfig();
         config.setClientChannelMaxIdleTimeSeconds(15);

Reply via email to