vongosling closed pull request #95: [ROCKETMQ-184]-It takes too long(3-33 
seconds) to switch to read from slave when master crashes
URL: https://github.com/apache/rocketmq/pull/95
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index bf019618d..c13e75c20 100644
--- 
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ 
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -162,7 +162,7 @@ public void testSendMessageAsync_Success() throws 
RemotingException, Interrupted
             public Object answer(InvocationOnMock mock) throws Throwable {
                 InvokeCallback callback = mock.getArgument(3);
                 RemotingCommand request = mock.getArgument(1);
-                ResponseFuture responseFuture = new 
ResponseFuture(request.getOpaque(), 3 * 1000, null, null);
+                ResponseFuture responseFuture = new 
ResponseFuture(null,request.getOpaque(), 3 * 1000, null, null);
                 
responseFuture.setResponseCommand(createSuccessResponse(request));
                 callback.operationComplete(responseFuture);
                 return null;
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 06918086f..664c5fd86 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -357,7 +357,7 @@ public RemotingCommand invokeSyncImpl(final Channel 
channel, final RemotingComma
         final int opaque = request.getOpaque();
 
         try {
-            final ResponseFuture responseFuture = new ResponseFuture(opaque, 
timeoutMillis, null, null);
+            final ResponseFuture responseFuture = new ResponseFuture(channel, 
opaque, timeoutMillis, null, null);
             this.responseTable.put(opaque, responseFuture);
             final SocketAddress addr = channel.remoteAddress();
             channel.writeAndFlush(request).addListener(new 
ChannelFutureListener() {
@@ -400,8 +400,7 @@ public void invokeAsyncImpl(final Channel channel, final 
RemotingCommand request
         boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, 
TimeUnit.MILLISECONDS);
         if (acquired) {
             final SemaphoreReleaseOnlyOnce once = new 
SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
-
-            final ResponseFuture responseFuture = new ResponseFuture(opaque, 
timeoutMillis, invokeCallback, once);
+            final ResponseFuture responseFuture = new ResponseFuture(channel, 
opaque, timeoutMillis, invokeCallback, once);
             this.responseTable.put(opaque, responseFuture);
             try {
                 channel.writeAndFlush(request).addListener(new 
ChannelFutureListener() {
@@ -410,20 +409,8 @@ public void operationComplete(ChannelFuture f) throws 
Exception {
                         if (f.isSuccess()) {
                             responseFuture.setSendRequestOK(true);
                             return;
-                        } else {
-                            responseFuture.setSendRequestOK(false);
-                        }
-
-                        responseFuture.putResponse(null);
-                        responseTable.remove(opaque);
-                        try {
-                            executeInvokeCallback(responseFuture);
-                        } catch (Throwable e) {
-                            log.warn("excute callback in writeAndFlush 
addListener, and callback throw", e);
-                        } finally {
-                            responseFuture.release();
                         }
-
+                        requestFail(opaque);
                         log.warn("send a request command to channel <{}> 
failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                     }
                 });
@@ -448,6 +435,38 @@ public void operationComplete(ChannelFuture f) throws 
Exception {
         }
     }
 
+    private void requestFail(final int opaque) {
+        ResponseFuture responseFuture = responseTable.remove(opaque);
+        if (responseFuture != null) {
+            responseFuture.setSendRequestOK(false);
+            responseFuture.putResponse(null);
+            try {
+                executeInvokeCallback(responseFuture);
+            } catch (Throwable e) {
+                log.warn("execute callback in requestFail, and callback 
throw", e);
+            } finally {
+                responseFuture.release();
+            }
+        }
+    }
+
+    /**
+     * mark the request of the specified channel as fail and to invoke fail 
callback immediately
+     * @param channel the channel which is close already
+     */
+    protected void failFast(final Channel channel) {
+        Iterator<Entry<Integer, ResponseFuture>> it = 
responseTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<Integer, ResponseFuture> entry = it.next();
+            if (entry.getValue().getProcessChannel() == channel) {
+                Integer opaque = entry.getKey();
+                if (opaque != null) {
+                    requestFail(opaque);
+                }
+            }
+        }
+    }
+
     public void invokeOnewayImpl(final Channel channel, final RemotingCommand 
request, final long timeoutMillis)
         throws InterruptedException, RemotingTooMuchRequestException, 
RemotingTimeoutException, RemotingSendRequestException {
         request.markOnewayRPC();
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index a96423c1f..d08bdd86d 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -650,7 +650,7 @@ public void close(ChannelHandlerContext ctx, ChannelPromise 
promise) throws Exce
             log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
             closeChannel(ctx.channel());
             super.close(ctx, promise);
-
+            NettyRemotingClient.this.failFast(ctx.channel());
             if (NettyRemotingClient.this.channelEventListener != null) {
                 NettyRemotingClient.this.putNettyEvent(new 
NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
             }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
index 1157c4502..5f4c8c695 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.remoting.netty;
 
+import io.netty.channel.Channel;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +26,7 @@
 
 public class ResponseFuture {
     private final int opaque;
+    private final Channel processChannel;
     private final long timeoutMillis;
     private final InvokeCallback invokeCallback;
     private final long beginTimestamp = System.currentTimeMillis();
@@ -37,9 +39,10 @@
     private volatile boolean sendRequestOK = true;
     private volatile Throwable cause;
 
-    public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback 
invokeCallback,
+    public ResponseFuture(Channel channel, int opaque, long timeoutMillis, 
InvokeCallback invokeCallback,
         SemaphoreReleaseOnlyOnce once) {
         this.opaque = opaque;
+        this.processChannel = channel;
         this.timeoutMillis = timeoutMillis;
         this.invokeCallback = invokeCallback;
         this.once = once;
@@ -114,11 +117,20 @@ public int getOpaque() {
         return opaque;
     }
 
+    public Channel getProcessChannel() {
+        return processChannel;
+    }
+
     @Override
     public String toString() {
-        return "ResponseFuture [responseCommand=" + responseCommand + ", 
sendRequestOK=" + sendRequestOK
-            + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" + 
timeoutMillis
-            + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" + 
beginTimestamp
+        return "ResponseFuture [responseCommand=" + responseCommand
+            + ", sendRequestOK=" + sendRequestOK
+            + ", cause=" + cause
+            + ", opaque=" + opaque
+            + ", processChannel=" + processChannel
+            + ", timeoutMillis=" + timeoutMillis
+            + ", invokeCallback=" + invokeCallback
+            + ", beginTimestamp=" + beginTimestamp
             + ", countDownLatch=" + countDownLatch + "]";
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to