vongosling closed pull request #95: [ROCKETMQ-184]-It takes too long(3-33
seconds) to switch to read from slave when master crashes
URL: https://github.com/apache/rocketmq/pull/95
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index bf019618d..c13e75c20 100644
---
a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++
b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -162,7 +162,7 @@ public void testSendMessageAsync_Success() throws
RemotingException, Interrupted
public Object answer(InvocationOnMock mock) throws Throwable {
InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1);
- ResponseFuture responseFuture = new
ResponseFuture(request.getOpaque(), 3 * 1000, null, null);
+ ResponseFuture responseFuture = new
ResponseFuture(null,request.getOpaque(), 3 * 1000, null, null);
responseFuture.setResponseCommand(createSuccessResponse(request));
callback.operationComplete(responseFuture);
return null;
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 06918086f..664c5fd86 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -357,7 +357,7 @@ public RemotingCommand invokeSyncImpl(final Channel
channel, final RemotingComma
final int opaque = request.getOpaque();
try {
- final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis, null, null);
+ final ResponseFuture responseFuture = new ResponseFuture(channel,
opaque, timeoutMillis, null, null);
this.responseTable.put(opaque, responseFuture);
final SocketAddress addr = channel.remoteAddress();
channel.writeAndFlush(request).addListener(new
ChannelFutureListener() {
@@ -400,8 +400,7 @@ public void invokeAsyncImpl(final Channel channel, final
RemotingCommand request
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis,
TimeUnit.MILLISECONDS);
if (acquired) {
final SemaphoreReleaseOnlyOnce once = new
SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
-
- final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis, invokeCallback, once);
+ final ResponseFuture responseFuture = new ResponseFuture(channel,
opaque, timeoutMillis, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
try {
channel.writeAndFlush(request).addListener(new
ChannelFutureListener() {
@@ -410,20 +409,8 @@ public void operationComplete(ChannelFuture f) throws
Exception {
if (f.isSuccess()) {
responseFuture.setSendRequestOK(true);
return;
- } else {
- responseFuture.setSendRequestOK(false);
- }
-
- responseFuture.putResponse(null);
- responseTable.remove(opaque);
- try {
- executeInvokeCallback(responseFuture);
- } catch (Throwable e) {
- log.warn("excute callback in writeAndFlush
addListener, and callback throw", e);
- } finally {
- responseFuture.release();
}
-
+ requestFail(opaque);
log.warn("send a request command to channel <{}>
failed.", RemotingHelper.parseChannelRemoteAddr(channel));
}
});
@@ -448,6 +435,38 @@ public void operationComplete(ChannelFuture f) throws
Exception {
}
}
+ private void requestFail(final int opaque) {
+ ResponseFuture responseFuture = responseTable.remove(opaque);
+ if (responseFuture != null) {
+ responseFuture.setSendRequestOK(false);
+ responseFuture.putResponse(null);
+ try {
+ executeInvokeCallback(responseFuture);
+ } catch (Throwable e) {
+ log.warn("execute callback in requestFail, and callback
throw", e);
+ } finally {
+ responseFuture.release();
+ }
+ }
+ }
+
+ /**
+ * mark the request of the specified channel as fail and to invoke fail
callback immediately
+ * @param channel the channel which is close already
+ */
+ protected void failFast(final Channel channel) {
+ Iterator<Entry<Integer, ResponseFuture>> it =
responseTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<Integer, ResponseFuture> entry = it.next();
+ if (entry.getValue().getProcessChannel() == channel) {
+ Integer opaque = entry.getKey();
+ if (opaque != null) {
+ requestFail(opaque);
+ }
+ }
+ }
+ }
+
public void invokeOnewayImpl(final Channel channel, final RemotingCommand
request, final long timeoutMillis)
throws InterruptedException, RemotingTooMuchRequestException,
RemotingTimeoutException, RemotingSendRequestException {
request.markOnewayRPC();
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index a96423c1f..d08bdd86d 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -650,7 +650,7 @@ public void close(ChannelHandlerContext ctx, ChannelPromise
promise) throws Exce
log.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
closeChannel(ctx.channel());
super.close(ctx, promise);
-
+ NettyRemotingClient.this.failFast(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new
NettyEvent(NettyEventType.CLOSE, remoteAddress, ctx.channel()));
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
index 1157c4502..5f4c8c695 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/ResponseFuture.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.remoting.netty;
+import io.netty.channel.Channel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +26,7 @@
public class ResponseFuture {
private final int opaque;
+ private final Channel processChannel;
private final long timeoutMillis;
private final InvokeCallback invokeCallback;
private final long beginTimestamp = System.currentTimeMillis();
@@ -37,9 +39,10 @@
private volatile boolean sendRequestOK = true;
private volatile Throwable cause;
- public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback
invokeCallback,
+ public ResponseFuture(Channel channel, int opaque, long timeoutMillis,
InvokeCallback invokeCallback,
SemaphoreReleaseOnlyOnce once) {
this.opaque = opaque;
+ this.processChannel = channel;
this.timeoutMillis = timeoutMillis;
this.invokeCallback = invokeCallback;
this.once = once;
@@ -114,11 +117,20 @@ public int getOpaque() {
return opaque;
}
+ public Channel getProcessChannel() {
+ return processChannel;
+ }
+
@Override
public String toString() {
- return "ResponseFuture [responseCommand=" + responseCommand + ",
sendRequestOK=" + sendRequestOK
- + ", cause=" + cause + ", opaque=" + opaque + ", timeoutMillis=" +
timeoutMillis
- + ", invokeCallback=" + invokeCallback + ", beginTimestamp=" +
beginTimestamp
+ return "ResponseFuture [responseCommand=" + responseCommand
+ + ", sendRequestOK=" + sendRequestOK
+ + ", cause=" + cause
+ + ", opaque=" + opaque
+ + ", processChannel=" + processChannel
+ + ", timeoutMillis=" + timeoutMillis
+ + ", invokeCallback=" + invokeCallback
+ + ", beginTimestamp=" + beginTimestamp
+ ", countDownLatch=" + countDownLatch + "]";
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services