This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit cbd0e46663778f6ab243620df41baefce2f1bbdb Author: yukon <[email protected]> AuthorDate: Fri May 17 14:47:01 2019 +0800 Polish interceptor and async handler --- .../apache/rocketmq/remoting/api/AsyncHandler.java | 6 +- .../rocketmq/remoting/api/RemotingClient.java | 2 +- .../rocketmq/remoting/api/RemotingServer.java | 5 +- .../rocketmq/remoting/api/RemotingService.java | 9 +- .../remoting/api/interceptor/ExceptionContext.java | 76 ------ .../remoting/api/interceptor/Interceptor.java | 6 +- .../remoting/api/interceptor/InterceptorGroup.java | 8 +- .../remoting/api/interceptor/ResponseContext.java | 24 -- .../{ResponseResult.java => ResponseFuture.java} | 57 ++--- .../remoting/impl/netty/NettyRemotingAbstract.java | 267 ++++++++++----------- .../remoting/impl/netty/NettyRemotingClient.java | 14 +- .../remoting/impl/netty/NettyRemotingServer.java | 5 +- 12 files changed, 154 insertions(+), 325 deletions(-) diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/AsyncHandler.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/AsyncHandler.java index 106431b..322503c 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/AsyncHandler.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/AsyncHandler.java @@ -26,9 +26,7 @@ import org.apache.rocketmq.remoting.api.command.RemotingCommand; * @since 1.0.0 */ public interface AsyncHandler { - void onFailure(RemotingCommand command); + void onFailure(RemotingCommand request, Throwable cause); - void onSuccess(RemotingCommand command); - - void onTimeout(long costTimeMillis, long timeoutMillis); + void onSuccess(RemotingCommand response); } diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingClient.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingClient.java index 1603af4..ee67272 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingClient.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingClient.java @@ -24,5 +24,5 @@ public interface RemotingClient extends RemotingService { void invokeAsync(String address, RemotingCommand request, AsyncHandler asyncHandler, long timeoutMillis); - void invokeOneWay(String address, RemotingCommand request, long timeoutMillis); + void invokeOneWay(String address, RemotingCommand request); } diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java index f36c83c..785f83e 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java @@ -25,8 +25,7 @@ public interface RemotingServer extends RemotingService { RemotingCommand invoke(RemotingChannel remotingChannel, RemotingCommand request, long timeoutMillis); - void invokeAsync(RemotingChannel remotingChannel, RemotingCommand request, AsyncHandler asyncHandler, - long timeoutMillis); + void invokeAsync(RemotingChannel remotingChannel, RemotingCommand request, AsyncHandler asyncHandler, long timeoutMillis); - void invokeOneWay(RemotingChannel remotingChannel, RemotingCommand request, long timeoutMillis); + void invokeOneWay(RemotingChannel remotingChannel, RemotingCommand request); } diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java index 57f3743..9cb59b4 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingService.java @@ -25,14 +25,13 @@ import org.apache.rocketmq.remoting.common.Pair; public interface RemotingService extends ConnectionService, ObjectLifecycle { void registerInterceptor(Interceptor interceptor); - void registerRequestProcessor(final short requestCode, final RequestProcessor processor, - final ExecutorService executor); + void registerRequestProcessor(short requestCode, RequestProcessor processor, ExecutorService executor); - void registerRequestProcessor(final short requestCode, final RequestProcessor processor); + void registerRequestProcessor(short requestCode, RequestProcessor processor); - void unregisterRequestProcessor(final short requestCode); + void unregisterRequestProcessor(short requestCode); - Pair<RequestProcessor, ExecutorService> processor(final short requestCode); + Pair<RequestProcessor, ExecutorService> processor(short requestCode); String remotingInstanceId(); diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java deleted file mode 100644 index 2452309..0000000 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ExceptionContext.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.api.interceptor; - -import org.apache.rocketmq.remoting.api.RemotingEndPoint; -import org.apache.rocketmq.remoting.api.command.RemotingCommand; - -public class ExceptionContext extends RequestContext { - private Throwable exception; - private String remark; - - public ExceptionContext(RemotingEndPoint remotingEndPoint, String remoteAddr, RemotingCommand request, - Throwable exception, String remark) { - super(remotingEndPoint, remoteAddr, request); - this.remotingEndPoint = remotingEndPoint; - this.remoteAddr = remoteAddr; - this.request = request; - this.exception = exception; - this.remark = remark; - } - - public RemotingEndPoint getRemotingEndPoint() { - return remotingEndPoint; - } - - public void setRemotingEndPoint(RemotingEndPoint remotingEndPoint) { - this.remotingEndPoint = remotingEndPoint; - } - - public String getRemoteAddr() { - return remoteAddr; - } - - public void setRemoteAddr(String remoteAddr) { - this.remoteAddr = remoteAddr; - } - - public RemotingCommand getRequest() { - return request; - } - - public void setRequest(RemotingCommand request) { - this.request = request; - } - - public Throwable getException() { - return exception; - } - - public void setException(Throwable exception) { - this.exception = exception; - } - - public String getRemark() { - return remark; - } - - public void setRemark(String remark) { - this.remark = remark; - } -} diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java index 62257ef..98a04cb 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/Interceptor.java @@ -18,9 +18,7 @@ package org.apache.rocketmq.remoting.api.interceptor; public interface Interceptor { - void beforeRequest(final RequestContext context); + void beforeRequest(RequestContext context); - void afterResponseReceived(final ResponseContext context); - - void onException(final ExceptionContext context); + void afterResponseReceived(ResponseContext context); } diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java index 9ffc696..e7baeed 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/InterceptorGroup.java @@ -21,7 +21,7 @@ import java.util.ArrayList; import java.util.List; public class InterceptorGroup { - private final List<Interceptor> interceptors = new ArrayList<Interceptor>(); + private final List<Interceptor> interceptors = new ArrayList<>(); public void registerInterceptor(final Interceptor interceptor) { if (interceptor != null) { @@ -40,10 +40,4 @@ public class InterceptorGroup { interceptor.afterResponseReceived(context); } } - - public void onException(final ExceptionContext context) { - for (Interceptor interceptor : interceptors) { - interceptor.onException(context); - } - } } diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java index c7f7a9b..005aa28 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java @@ -34,30 +34,6 @@ public class ResponseContext extends RequestContext { this.response = response; } - public RemotingEndPoint getRemotingEndPoint() { - return remotingEndPoint; - } - - public void setRemotingEndPoint(RemotingEndPoint remotingEndPoint) { - this.remotingEndPoint = remotingEndPoint; - } - - public String getRemoteAddr() { - return remoteAddr; - } - - public void setRemoteAddr(String remoteAddr) { - this.remoteAddr = remoteAddr; - } - - public RemotingCommand getRequest() { - return request; - } - - public void setRequest(RemotingCommand request) { - this.request = request; - } - @Override public String toString() { return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java similarity index 68% rename from remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java rename to remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java index 92f501f..e614963 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseResult.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java @@ -23,16 +23,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.rocketmq.remoting.api.AsyncHandler; -import org.apache.rocketmq.remoting.api.RemotingEndPoint; import org.apache.rocketmq.remoting.api.command.RemotingCommand; -import org.apache.rocketmq.remoting.api.interceptor.ExceptionContext; import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup; -import org.apache.rocketmq.remoting.api.interceptor.ResponseContext; +import org.jetbrains.annotations.Nullable; -public class ResponseResult { +public class ResponseFuture { private final long beginTimestamp = System.currentTimeMillis(); private final CountDownLatch countDownLatch = new CountDownLatch(1); - private final AtomicBoolean interceptorExecuted = new AtomicBoolean(false); + private final AtomicBoolean asyncHandlerExecuted = new AtomicBoolean(false); private int requestId; private long timeoutMillis; @@ -47,54 +45,27 @@ public class ResponseResult { private InterceptorGroup interceptorGroup; private String remoteAddr; - public ResponseResult(int requestId, long timeoutMillis, AsyncHandler asyncHandler, SemaphoreReleaseOnlyOnce once) { + public ResponseFuture(int requestId, long timeoutMillis, AsyncHandler asyncHandler, @Nullable SemaphoreReleaseOnlyOnce once) { this.requestId = requestId; this.timeoutMillis = timeoutMillis; this.asyncHandler = asyncHandler; this.once = once; } - public ResponseResult(int requestId, long timeoutMillis) { + public ResponseFuture(int requestId, long timeoutMillis) { this.requestId = requestId; this.timeoutMillis = timeoutMillis; } - public void executeRequestSendFailed() { - if (this.interceptorExecuted.compareAndSet(false, true)) { - try { - interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, this.remoteAddr, this.requestCommand, - cause, "REQUEST_SEND_FAILED")); - } catch (Throwable e) { - } - //Sync call - if (null != asyncHandler) { - asyncHandler.onFailure(requestCommand); - } - } - } - - public void executeCallbackArrived(final RemotingCommand response) { - if (this.interceptorExecuted.compareAndSet(false, true)) { - try { - interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST, this.remoteAddr, - this.requestCommand, response)); - } catch (Throwable e) { - } - if (null != asyncHandler) { - asyncHandler.onSuccess(response); - } - } - } - - public void onTimeout(long costTimeMillis, long timoutMillis) { - if (this.interceptorExecuted.compareAndSet(false, true)) { - try { - interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, this.remoteAddr, this.requestCommand, - null, "CALLBACK_TIMEOUT")); - } catch (Throwable ignore) { - } - if (null != asyncHandler) { - asyncHandler.onTimeout(costTimeMillis, timoutMillis); + public void executeAsyncHandler() { + if (asyncHandler != null) { + if (this.asyncHandlerExecuted.compareAndSet(false, true)) { + if (cause != null) { + asyncHandler.onFailure(requestCommand, cause); + } else { + assert responseCommand != null; + asyncHandler.onSuccess(responseCommand); + } } } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index cbb211e..ac989f8 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -24,7 +24,8 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.Iterator; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -44,14 +45,13 @@ import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory; import org.apache.rocketmq.remoting.api.command.TrafficType; import org.apache.rocketmq.remoting.api.exception.RemoteAccessException; import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException; -import org.apache.rocketmq.remoting.api.interceptor.ExceptionContext; import org.apache.rocketmq.remoting.api.interceptor.Interceptor; import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.api.interceptor.RequestContext; import org.apache.rocketmq.remoting.api.interceptor.ResponseContext; import org.apache.rocketmq.remoting.common.ChannelEventListenerGroup; import org.apache.rocketmq.remoting.common.Pair; -import org.apache.rocketmq.remoting.common.ResponseResult; +import org.apache.rocketmq.remoting.common.ResponseFuture; import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; import org.apache.rocketmq.remoting.config.RemotingConfig; import org.apache.rocketmq.remoting.external.ThreadUtils; @@ -68,7 +68,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { protected final ChannelEventExecutor channelEventExecutor = new ChannelEventExecutor("ChannelEventExecutor"); private final Semaphore semaphoreOneway; private final Semaphore semaphoreAsync; - private final Map<Integer, ResponseResult> ackTables = new ConcurrentHashMap<Integer, ResponseResult>(256); + private final Map<Integer, ResponseFuture> ackTables = new ConcurrentHashMap<Integer, ResponseFuture>(256); private final Map<Short, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<>(); private final RemotingCommandFactory remotingCommandFactory; private final String remotingInstanceId = UIDGenerator.instance().createUID(); @@ -101,23 +101,23 @@ public abstract class NettyRemotingAbstract implements RemotingService { } void scanResponseTable() { - Iterator<Map.Entry<Integer, ResponseResult>> iterator = this.ackTables.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<Integer, ResponseResult> next = iterator.next(); - ResponseResult result = next.getValue(); + final List<Integer> rList = new ArrayList<>(); - if ((result.getBeginTimestamp() + result.getTimeoutMillis()) <= System.currentTimeMillis()) { - iterator.remove(); - try { - long timeoutMillis = result.getTimeoutMillis(); - long costTimeMillis = System.currentTimeMillis() - result.getBeginTimestamp(); - result.onTimeout(timeoutMillis, costTimeMillis); - } catch (Throwable e) { - LOG.warn("Error occurred when execute timeout callback !", e); - } finally { - result.release(); - LOG.warn("Removed timeout request {} ", result); - } + for (final Map.Entry<Integer, ResponseFuture> next : this.ackTables.entrySet()) { + ResponseFuture responseFuture = next.getValue(); + + if ((responseFuture.getBeginTimestamp() + responseFuture.getTimeoutMillis()) <= System.currentTimeMillis()) { + rList.add(responseFuture.getRequestId()); + } + } + + for (Integer requestID: rList) { + ResponseFuture rf = this.ackTables.remove(requestID); + + if (rf != null) { + LOG.warn("remove timeout request {} ", rf); + rf.setCause(new RemoteTimeoutException(rf.getRemoteAddr(), rf.getTimeoutMillis())); + executeAsyncHandler(rf); } } } @@ -167,9 +167,6 @@ public abstract class NettyRemotingAbstract implements RemotingService { extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString())); if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { - interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, - extractRemoteAddress(ctx.channel()), cmd, e, "FLOW_CONTROL")); - RemotingCommand response = remotingCommandFactory.createResponse(cmd); response.opCode(RemotingSysResponseCode.SYSTEM_BUSY); response.remark("SYSTEM_BUSY"); @@ -178,49 +175,23 @@ public abstract class NettyRemotingAbstract implements RemotingService { } } - private void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { - final ResponseResult responseResult = ackTables.get(cmd.requestID()); - if (responseResult != null) { - responseResult.setResponseCommand(cmd); - responseResult.release(); - - ackTables.remove(cmd.requestID()); - - if (responseResult.getAsyncHandler() != null) { - boolean sameThread = false; - ExecutorService executor = this.getCallbackExecutor(); - if (executor != null) { - try { - executor.submit(new Runnable() { - @Override - public void run() { - try { - responseResult.executeCallbackArrived(responseResult.getResponseCommand()); - } catch (Throwable e) { - LOG.warn("Execute callback error !", e); - } - } - }); - } catch (RejectedExecutionException e) { - sameThread = true; - LOG.warn("Execute submit error !", e); - } - } else { - sameThread = true; - } + private void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand response) { + final ResponseFuture responseFuture = ackTables.remove(response.requestID()); + if (responseFuture != null) { + responseFuture.setResponseCommand(response); + responseFuture.release(); - if (sameThread) { - try { - responseResult.executeCallbackArrived(responseResult.getResponseCommand()); - } catch (Throwable e) { - LOG.warn("Execute callback in response thread error !", e); - } - } + this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST, + extractRemoteAddress(ctx.channel()), responseFuture.getRequestCommand(), response)); + + if (responseFuture.getAsyncHandler() != null) { + executeAsyncHandler(responseFuture); } else { - responseResult.putResponse(cmd); + responseFuture.putResponse(response); + responseFuture.release(); } } else { - LOG.warn("request {} from {} has not matched response !", cmd, extractRemoteAddress(ctx.channel())); + LOG.warn("request {} from {} has not matched response !", response, extractRemoteAddress(ctx.channel())); } } @@ -261,6 +232,60 @@ public abstract class NettyRemotingAbstract implements RemotingService { return this.publicExecutor; } + /** + * Execute callback in callback executor. If callback executor is null, run directly in current thread + */ + private void executeAsyncHandler(final ResponseFuture responseFuture) { + boolean runInThisThread = false; + ExecutorService executor = this.getCallbackExecutor(); + if (executor != null) { + try { + executor.submit(new Runnable() { + @Override + public void run() { + try { + responseFuture.executeAsyncHandler(); + } catch (Throwable e) { + LOG.warn("execute callback in executor exception, and callback throw", e); + } finally { + responseFuture.release(); + } + } + }); + } catch (Throwable e) { + runInThisThread = true; + LOG.warn("execute callback in executor exception, maybe executor busy", e); + } + } else { + runInThisThread = true; + } + + if (runInThisThread) { + try { + responseFuture.executeAsyncHandler(); + } catch (Throwable e) { + LOG.warn("executeInvokeCallback Exception", e); + } finally { + responseFuture.release(); + } + } + } + + private void requestFail(final int requestID, final Throwable cause) { + ResponseFuture responseFuture = ackTables.remove(requestID); + if (responseFuture != null) { + responseFuture.setSendRequestOK(false); + responseFuture.putResponse(null); + responseFuture.setCause(cause); + executeAsyncHandler(responseFuture); + } + } + + private void requestFail(final ResponseFuture responseFuture, final Throwable cause) { + responseFuture.setCause(cause); + executeAsyncHandler(responseFuture); + } + private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) { if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { if (response != null) { @@ -277,8 +302,10 @@ public abstract class NettyRemotingAbstract implements RemotingService { private void handleException(Throwable e, RemotingCommand cmd, ChannelHandlerContext ctx) { if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { - //FiXME Exception interceptor can not throw exception - interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, extractRemoteAddress(ctx.channel()), cmd, e, "")); + RemotingCommand response = remotingCommandFactory.createResponse(cmd); + response.opCode(RemotingSysResponseCode.SYSTEM_ERROR); + response.remark("SYSTEM_ERROR"); + writeAndFlush(ctx.channel(), response); } } @@ -288,7 +315,6 @@ public abstract class NettyRemotingAbstract implements RemotingService { final String remoteAddr = extractRemoteAddress(channel); - //FIXME try catch here this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request)); RemotingCommand responseCommand = this.invoke0(remoteAddr, channel, request, timeoutMillis); @@ -302,27 +328,25 @@ public abstract class NettyRemotingAbstract implements RemotingService { private RemotingCommand invoke0(final String remoteAddr, final Channel channel, final RemotingCommand request, final long timeoutMillis) { try { - final int opaque = request.requestID(); - final ResponseResult responseResult = new ResponseResult(opaque, timeoutMillis); - responseResult.setRequestCommand(request); - //FIXME one interceptor for all case ? - responseResult.setInterceptorGroup(this.interceptorGroup); - responseResult.setRemoteAddr(remoteAddr); + final int requestID = request.requestID(); + final ResponseFuture responseFuture = new ResponseFuture(requestID, timeoutMillis); + responseFuture.setRequestCommand(request); + responseFuture.setRemoteAddr(remoteAddr); - this.ackTables.put(opaque, responseResult); + this.ackTables.put(requestID, responseFuture); ChannelFutureListener listener = new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { - responseResult.setSendRequestOK(true); + responseFuture.setSendRequestOK(true); return; } else { - responseResult.setSendRequestOK(false); + responseFuture.setSendRequestOK(false); - ackTables.remove(opaque); - responseResult.setCause(f.cause()); - responseResult.putResponse(null); + ackTables.remove(requestID); + responseFuture.setCause(f.cause()); + responseFuture.putResponse(null); LOG.warn("Send request command to {} failed !", remoteAddr); } @@ -331,14 +355,14 @@ public abstract class NettyRemotingAbstract implements RemotingService { this.writeAndFlush(channel, request, listener); - RemotingCommand responseCommand = responseResult.waitResponse(timeoutMillis); + RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { - if (responseResult.isSendRequestOK()) { - throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseResult.getCause()); + if (responseFuture.isSendRequestOK()) { + throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseFuture.getCause()); } else { - throw new RemoteAccessException(extractRemoteAddress(channel), responseResult.getCause()); + throw new RemoteAccessException(extractRemoteAddress(channel), responseFuture.getCause()); } } @@ -360,98 +384,57 @@ public abstract class NettyRemotingAbstract implements RemotingService { this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request)); - Exception exception = null; - - try { - this.invokeAsync0(remoteAddr, channel, request, timeoutMillis, invokeCallback); - } catch (InterruptedException e) { - exception = e; - } finally { - if (null != exception) { - try { - this.interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request, exception, "REMOTING_EXCEPTION")); - } catch (Throwable e) { - LOG.warn("onException ", e); - } - } - } + this.invokeAsync0(remoteAddr, channel, request, invokeCallback, timeoutMillis); } private void invokeAsync0(final String remoteAddr, final Channel channel, final RemotingCommand request, - final long timeoutMillis, final AsyncHandler invokeCallback) throws InterruptedException { - boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + final AsyncHandler asyncHandler, final long timeoutMillis) { + boolean acquired = this.semaphoreAsync.tryAcquire(); if (acquired) { final int requestID = request.requestID(); SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync); - final ResponseResult responseResult = new ResponseResult(request.requestID(), timeoutMillis, invokeCallback, once); - responseResult.setRequestCommand(request); - responseResult.setInterceptorGroup(this.interceptorGroup); - responseResult.setRemoteAddr(remoteAddr); + final ResponseFuture responseFuture = new ResponseFuture(requestID, timeoutMillis, asyncHandler, once); + responseFuture.setRequestCommand(request); + responseFuture.setRemoteAddr(remoteAddr); - this.ackTables.put(request.requestID(), responseResult); + this.ackTables.put(requestID, responseFuture); try { ChannelFutureListener listener = new ChannelFutureListener() { @Override - public void operationComplete(ChannelFuture f) throws Exception { - responseResult.setSendRequestOK(f.isSuccess()); + public void operationComplete(ChannelFuture f) { + responseFuture.setSendRequestOK(f.isSuccess()); if (f.isSuccess()) { return; } - responseResult.putResponse(null); - ackTables.remove(requestID); - try { - responseResult.executeRequestSendFailed(); - } catch (Throwable e) { - LOG.warn("Execute callback error !", e); - } finally { - responseResult.release(); - } - + requestFail(requestID, f.cause()); LOG.warn("Send request command to channel failed.", remoteAddr); } }; this.writeAndFlush(channel, request, listener); } catch (Exception e) { - responseResult.release(); + requestFail(requestID, e); LOG.error("Send request command to channel " + channel + " error !", e); } } else { - String info = String.format("Semaphore tryAcquire %d ms timeout for request %s ,waiting thread nums: %d,availablePermits: %d", - timeoutMillis, request.toString(), semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits()); + String info = String.format("No available async semaphore to issue the request request %s", request.toString()); + requestFail(new ResponseFuture(request.requestID(), timeoutMillis, asyncHandler, null), new RemoteAccessException(info)); LOG.error(info); - throw new RemoteTimeoutException(info); } } - public void invokeOnewayWithInterceptor(final Channel channel, final RemotingCommand request, long timeoutMillis) { + public void invokeOnewayWithInterceptor(final Channel channel, final RemotingCommand request) { request.trafficType(TrafficType.REQUEST_ONEWAY); this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request)); - - Exception exception = null; - - try { - this.invokeOneway0(channel, request, timeoutMillis); - } catch (InterruptedException e) { - exception = e; - } finally { - if (null != exception) { - try { - this.interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request, exception, "REMOTING_EXCEPTION")); - } catch (Throwable e) { - LOG.warn("onException ", e); - } - } - } + this.invokeOneway0(channel, request); } - private void invokeOneway0(final Channel channel, final RemotingCommand request, - final long timeoutMillis) throws InterruptedException { - boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); + private void invokeOneway0(final Channel channel, final RemotingCommand request) { + boolean acquired = this.semaphoreOneway.tryAcquire(); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); try { @@ -459,7 +442,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { ChannelFutureListener listener = new ChannelFutureListener() { @Override - public void operationComplete(ChannelFuture f) throws Exception { + public void operationComplete(ChannelFuture f) { once.release(); if (!f.isSuccess()) { LOG.warn("Send request command to channel {} failed !", socketAddress); @@ -473,10 +456,8 @@ public abstract class NettyRemotingAbstract implements RemotingService { LOG.error("Send request command to channel " + channel + " error !", e); } } else { - String info = String.format("Semaphore tryAcquire %d ms timeout for request %s ,waiting thread nums: %d,availablePermits: %d", - timeoutMillis, request.toString(), semaphoreAsync.getQueueLength(), this.semaphoreAsync.availablePermits()); + String info = String.format("No available oneway semaphore to issue the request %s", request.toString()); LOG.error(info); - throw new RemoteTimeoutException(info); } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java index b9f9a64..f098de3 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java @@ -351,11 +351,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti final Channel channel = this.createIfAbsent(address); if (channel != null && channel.isActive()) { - // We support Netty's channel-level backpressure thereby respecting slow receivers on the other side. - if (!channel.isWritable()) { - // Note: It's up to the layer above a transport to decide whether or not to requeue a canceled write. - LOG.warn("Channel statistics - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", channel.bytesBeforeUnwritable(), channel.bytesBeforeWritable()); - } this.invokeAsyncWithInterceptor(channel, request, asyncHandler, timeoutMillis); } else { this.closeChannel(address, channel); @@ -363,15 +358,10 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } @Override - public void invokeOneWay(final String address, final RemotingCommand request, final long timeoutMillis) { + public void invokeOneWay(final String address, final RemotingCommand request) { final Channel channel = this.createIfAbsent(address); if (channel != null && channel.isActive()) { - if (!channel.isWritable()) { - //if (this.clientConfig.isSocketFlowControl()) { - LOG.warn("Channel statistics - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", channel.bytesBeforeUnwritable(), channel.bytesBeforeWritable()); - //throw new ServiceInvocationFailureException(String.format("Channel[%s] is not writable now", channel.toString())); - } - this.invokeOnewayWithInterceptor(channel, request, timeoutMillis); + this.invokeOnewayWithInterceptor(channel, request); } else { this.closeChannel(address, channel); } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java index 60aca5e..40e3cb7 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java @@ -195,9 +195,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } @Override - public void invokeOneWay(final RemotingChannel remotingChannel, final RemotingCommand request, - final long timeoutMillis) { - invokeOnewayWithInterceptor(((NettyChannelImpl) remotingChannel).getChannel(), request, timeoutMillis); + public void invokeOneWay(final RemotingChannel remotingChannel, final RemotingCommand request) { + invokeOnewayWithInterceptor(((NettyChannelImpl) remotingChannel).getChannel(), request); } private class ServerConnectionHandler extends ChannelDuplexHandler {
