This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit 6ff7202494a003e8ad84b7ac465e53a88d2b7c9d Author: yukon <[email protected]> AuthorDate: Wed Jun 5 22:01:27 2019 +0800 Polish the exception structure and add basic tests for NettyRemotingAbstract --- pom.xml | 2 +- .../remoting/api/channel/ChannelEventListener.java | 8 +- .../api/exception/RemoteAccessException.java | 2 +- ...eException.java => RemoteRuntimeException.java} | 10 +- .../remoting/api/interceptor/RequestContext.java | 2 +- .../remoting/api/interceptor/ResponseContext.java | 2 +- .../remoting/common/ChannelEventListenerGroup.java | 4 +- .../rocketmq/remoting/common/ResponseFuture.java | 17 +- .../remoting/impl/command/RemotingCommandImpl.java | 7 +- .../remoting/impl/netty/NettyChannelEvent.java | 2 +- .../remoting/impl/netty/NettyRemotingAbstract.java | 61 +++- .../remoting/impl/netty/NettyRemotingClient.java | 4 +- .../remoting/impl/netty/NettyRemotingServer.java | 2 - .../remoting/impl/netty/handler/Encoder.java | 7 +- .../rocketmq/remoting/internal/RemotingUtil.java | 9 +- .../org/apache/rocketmq/remoting/BaseTest.java | 56 +++- .../remoting/common/ResponseFutureTest.java | 4 +- .../impl/netty/NettyRemotingAbstractTest.java | 364 +++++++++++++++++++++ 18 files changed, 506 insertions(+), 57 deletions(-) diff --git a/pom.xml b/pom.xml index 23eeeb5..3d28b24 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> - <version>3.4</version> + <version>3.6</version> </dependency> <dependency> <groupId>org.jetbrains</groupId> diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelEventListener.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelEventListener.java index 0c0afcf..23b6e88 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelEventListener.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/channel/ChannelEventListener.java @@ -18,11 +18,11 @@ package org.apache.rocketmq.remoting.api.channel; public interface ChannelEventListener { - void onChannelConnect(final RemotingChannel channel); + void onChannelConnect(RemotingChannel channel); - void onChannelClose(final RemotingChannel channel); + void onChannelClose(RemotingChannel channel); - void onChannelException(final RemotingChannel channel); + void onChannelException(RemotingChannel channel, Throwable cause); - void onChannelIdle(final RemotingChannel channel); + void onChannelIdle(RemotingChannel channel); } diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java index 6ce6dd4..d6d46f0 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteAccessException.java @@ -30,7 +30,7 @@ package org.apache.rocketmq.remoting.api.exception; * * @since 1.0.0 */ -public class RemoteAccessException extends NestedRuntimeException { +public class RemoteAccessException extends RemoteRuntimeException { private static final long serialVersionUID = 6280428909532427263L; /** diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteRuntimeException.java similarity index 90% rename from remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java rename to remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteRuntimeException.java index 7ef01db..a83be9f 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/RemoteRuntimeException.java @@ -27,26 +27,26 @@ package org.apache.rocketmq.remoting.api.exception; * * @since 1.0.0 */ -public abstract class NestedRuntimeException extends RuntimeException { +public abstract class RemoteRuntimeException extends RuntimeException { private static final long serialVersionUID = -8371779880133933367L; /** - * Construct a {@code NestedRuntimeException} with the specified detail message. + * Construct a {@code RemoteRuntimeException} with the specified detail message. * * @param msg the detail message */ - public NestedRuntimeException(String msg) { + public RemoteRuntimeException(String msg) { super(msg); } /** - * Construct a {@code NestedRuntimeException} with the specified detail message + * Construct a {@code RemoteRuntimeException} with the specified detail message * and nested exception. * * @param msg the detail message * @param cause the nested exception */ - public NestedRuntimeException(String msg, Throwable cause) { + public RemoteRuntimeException(String msg, Throwable cause) { super(msg, cause); } diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java index d961556..a93e71c 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/RequestContext.java @@ -60,6 +60,6 @@ public class RequestContext { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE); } } diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java index 005aa28..f076d8f 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java @@ -36,7 +36,7 @@ public class ResponseContext extends RequestContext { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE); } public RemotingCommand getResponse() { diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java index 8af61f7..4c374e9 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ChannelEventListenerGroup.java @@ -47,9 +47,9 @@ public class ChannelEventListenerGroup { } } - public void onChannelException(final RemotingChannel channel) { + public void onChannelException(final RemotingChannel channel, final Throwable cause) { for (ChannelEventListener listener : listenerList) { - listener.onChannelException(channel); + listener.onChannelException(channel, cause); } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java index e6c394b..6a2f246 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java @@ -21,23 +21,32 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringExclude; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.exception.RemoteRuntimeException; import org.jetbrains.annotations.Nullable; public class ResponseFuture { private final long beginTimestamp = System.currentTimeMillis(); + + @ToStringExclude private final CountDownLatch countDownLatch = new CountDownLatch(1); + @ToStringExclude private final AtomicBoolean asyncHandlerExecuted = new AtomicBoolean(false); private int requestId; private long timeoutMillis; + + @ToStringExclude private AsyncHandler asyncHandler; private volatile RemotingCommand responseCommand; private volatile boolean sendRequestOK = true; - private volatile Throwable cause; + private volatile RemoteRuntimeException cause; + + @ToStringExclude private SemaphoreReleaseOnlyOnce once; private RemotingCommand requestCommand; @@ -108,11 +117,11 @@ public class ResponseFuture { return asyncHandler; } - public Throwable getCause() { + public RemoteRuntimeException getCause() { return cause; } - public void setCause(Throwable cause) { + public void setCause(RemoteRuntimeException cause) { this.cause = cause; } @@ -146,6 +155,6 @@ public class ResponseFuture { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.DEFAULT_STYLE); } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java index 4d1af44..8454616 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringExclude; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.rocketmq.remoting.api.command.RemotingCommand; import org.apache.rocketmq.remoting.api.command.TrafficType; @@ -36,7 +37,11 @@ public class RemotingCommandImpl implements RemotingCommand { private TrafficType trafficType = TrafficType.REQUEST_SYNC; private short opCode = RemotingSysResponseCode.SUCCESS; private String remark = ""; + + @ToStringExclude private Map<String, String> properties = new HashMap<>(); + + @ToStringExclude private byte[] payload; protected RemotingCommandImpl() { @@ -139,6 +144,6 @@ public class RemotingCommandImpl implements RemotingCommand { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, ToStringStyle.SIMPLE_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.NO_CLASS_NAME_STYLE); } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java index ec9cece..097213c 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyChannelEvent.java @@ -50,6 +50,6 @@ public class NettyChannelEvent { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.NO_CLASS_NAME_STYLE); } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index 9e865d0..de4fc81 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -43,6 +43,7 @@ import org.apache.rocketmq.remoting.api.command.RemotingCommand; import org.apache.rocketmq.remoting.api.command.RemotingCommandFactory; import org.apache.rocketmq.remoting.api.command.TrafficType; import org.apache.rocketmq.remoting.api.exception.RemoteAccessException; +import org.apache.rocketmq.remoting.api.exception.RemoteRuntimeException; import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException; import org.apache.rocketmq.remoting.api.interceptor.Interceptor; import org.apache.rocketmq.remoting.api.interceptor.InterceptorGroup; @@ -58,7 +59,6 @@ import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl; import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl; import org.apache.rocketmq.remoting.impl.command.RemotingSysResponseCode; import org.apache.rocketmq.remoting.internal.RemotingUtil; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,12 +93,35 @@ public abstract class NettyRemotingAbstract implements RemotingService { * responding processor in this map to handle the request. */ private final Map<Short, Pair<RequestProcessor, ExecutorService>> processorTables = new ConcurrentHashMap<>(); + + /** + * This factory provides methods to create RemotingCommand. + */ private final RemotingCommandFactory remotingCommandFactory; + /** + * Executor to execute RequestProcessor without specific executor. + */ private final ExecutorService publicExecutor; + + /** + * Invoke the async handler in this executor when process response. + */ private final ExecutorService asyncHandlerExecutor; + + /** + * This scheduled executor provides the ability to govern on-going response table. + */ protected ScheduledExecutorService houseKeepingService = ThreadUtils.newSingleThreadScheduledExecutor("HouseKeepingService", true); + + /** + * Provides custom interceptor at the occurrence of beforeRequest and afterResponseReceived event. + */ private InterceptorGroup interceptorGroup = new InterceptorGroup(); + + /** + * Provides listener mechanism to handle netty channel events. + */ private ChannelEventListenerGroup channelEventListenerGroup = new ChannelEventListenerGroup(); NettyRemotingAbstract(RemotingConfig remotingConfig) { @@ -110,7 +133,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { this.asyncHandlerExecutor = ThreadUtils.newFixedThreadPool( remotingConfig.getAsyncHandlerExecutorThreads(), - 10000, "Remoting-PublicExecutor", true); + 10000, "Remoting-AsyncExecutor", true); this.remotingCommandFactory = new RemotingCommandFactoryImpl(); } @@ -144,8 +167,8 @@ public abstract class NettyRemotingAbstract implements RemotingService { ResponseFuture rf = this.ackTables.remove(requestID); if (rf != null) { - LOG.warn("remove timeout request {} ", rf); - rf.setCause(new RemoteTimeoutException(rf.getRemoteAddr(), rf.getTimeoutMillis())); + LOG.warn("Removes timeout request {} ", rf.getRequestCommand()); + rf.setCause(new RemoteTimeoutException(String.format("Request to %s timeout", rf.getRemoteAddr()), rf.getTimeoutMillis())); executeAsyncHandler(rf); } } @@ -153,6 +176,8 @@ public abstract class NettyRemotingAbstract implements RemotingService { @Override public void start() { + startUpHouseKeepingService(); + if (this.channelEventListenerGroup.size() > 0) { this.channelEventExecutor.start(); } @@ -230,11 +255,10 @@ public abstract class NettyRemotingAbstract implements RemotingService { responseFuture.release(); } } else { - LOG.warn("request {} from {} has not matched response !", response, RemotingUtil.extractRemoteAddress(ctx.channel())); + LOG.warn("Response {} from {} doesn't have a matched request!", response, RemotingUtil.extractRemoteAddress(ctx.channel())); } } - @NotNull private Runnable buildProcessorTask(final ChannelHandlerContext ctx, final RemotingCommand cmd, final Pair<RequestProcessor, ExecutorService> processorExecutorPair, final RemotingChannel channel) { return new Runnable() { @@ -302,7 +326,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { } } - private void requestFail(final int requestID, final Throwable cause) { + private void requestFail(final int requestID, final RemoteRuntimeException cause) { ResponseFuture responseFuture = ackTables.remove(requestID); if (responseFuture != null) { responseFuture.setSendRequestOK(false); @@ -312,7 +336,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { } } - private void requestFail(final ResponseFuture responseFuture, final Throwable cause) { + private void requestFail(final ResponseFuture responseFuture, final RemoteRuntimeException cause) { responseFuture.setCause(cause); executeAsyncHandler(responseFuture); } @@ -368,7 +392,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { ChannelFutureListener listener = new ChannelFutureListener() { @Override - public void operationComplete(ChannelFuture f) throws Exception { + public void operationComplete(ChannelFuture f) { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; @@ -376,7 +400,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { responseFuture.setSendRequestOK(false); ackTables.remove(requestID); - responseFuture.setCause(f.cause()); + responseFuture.setCause(new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), f.cause())); responseFuture.putResponse(null); LOG.warn("Send request command to {} failed !", remoteAddr); @@ -390,9 +414,10 @@ public abstract class NettyRemotingAbstract implements RemotingService { if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { - throw new RemoteTimeoutException(RemotingUtil.extractRemoteAddress(channel), timeoutMillis, responseFuture.getCause()); + responseFuture.setCause(new RemoteTimeoutException(RemotingUtil.extractRemoteAddress(channel), timeoutMillis)); + throw responseFuture.getCause(); } else { - throw new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), responseFuture.getCause()); + throw responseFuture.getCause(); } } @@ -439,14 +464,14 @@ public abstract class NettyRemotingAbstract implements RemotingService { return; } - requestFail(requestID, f.cause()); + requestFail(requestID, new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), f.cause())); LOG.warn("Send request command to channel failed.", remoteAddr); } }; this.writeAndFlush(channel, request, listener); } catch (Exception e) { - requestFail(requestID, e); + requestFail(requestID, new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), e)); LOG.error("Send request command to channel " + channel + " error !", e); } } else { @@ -543,7 +568,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { if (this.eventQueue.size() <= MAX_SIZE) { this.eventQueue.add(event); } else { - LOG.warn("event queue size[{}] enough, so drop this event {}", this.eventQueue.size(), event.toString()); + LOG.warn("Event queue size[{}] meets the limit, so drop this event {}", this.eventQueue.size(), event.toString()); } } @@ -559,7 +584,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { if (event != null && listener != null) { RemotingChannel channel = new NettyChannelImpl(event.getChannel()); - LOG.warn("Channel Event, {}", event); + LOG.info("Dispatch received channel event, {}", event); switch (event.getType()) { case IDLE: @@ -572,14 +597,14 @@ public abstract class NettyRemotingAbstract implements RemotingService { listener.onChannelConnect(channel); break; case EXCEPTION: - listener.onChannelException(channel); + listener.onChannelException(channel, event.getCause()); break; default: break; } } } catch (Exception e) { - LOG.error("error", e); + LOG.warn("Exception thrown when dispatching channel event", e); break; } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java index 6b2796e..c146813 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java @@ -98,8 +98,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti }); applyOptions(clientBootstrap); - - startUpHouseKeepingService(); } @Override @@ -237,7 +235,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { LOG.info("Close channel {} because of error {} ", ctx.channel(), cause); NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel()); - putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel())); + putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel(), cause)); } } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java index f0dbb45..0967208 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java @@ -114,8 +114,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ChannelFuture channelFuture = this.serverBootstrap.bind(this.serverConfig.getServerListenPort()).syncUninterruptibly(); this.port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort(); - - startUpHouseKeepingService(); } @Override diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java index 702e2b4..3c5a90e 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/handler/Encoder.java @@ -22,12 +22,12 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; -import java.net.InetSocketAddress; import org.apache.rocketmq.remoting.api.buffer.ByteBufferWrapper; import org.apache.rocketmq.remoting.api.command.RemotingCommand; import org.apache.rocketmq.remoting.api.exception.RemoteCodecException; import org.apache.rocketmq.remoting.impl.buffer.NettyByteBufferWrapper; import org.apache.rocketmq.remoting.impl.command.CodecHelper; +import org.apache.rocketmq.remoting.internal.RemotingUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,10 +44,7 @@ public class Encoder extends MessageToByteEncoder<RemotingCommand> { encode(remotingCommand, wrapper); } catch (final RemoteCodecException e) { - String remoteAddress = "UnKnown"; - if (ctx.channel().remoteAddress() instanceof InetSocketAddress) { - remoteAddress = ((InetSocketAddress) ctx.channel().remoteAddress()).getAddress().getHostAddress(); - } + String remoteAddress = RemotingUtil.extractRemoteAddress(ctx.channel()); LOG.error(String.format("Error occurred when encoding command for channel %s", remoteAddress), e); ctx.channel().close().addListener(new ChannelFutureListener() { diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java index 89e4bff..9d23fcf 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java @@ -19,9 +19,16 @@ package org.apache.rocketmq.remoting.internal; import io.netty.channel.Channel; import java.net.InetSocketAddress; +import java.net.SocketAddress; public class RemotingUtil { public static String extractRemoteAddress(Channel channel) { - return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress(); + SocketAddress socketAddress = channel.remoteAddress(); + + if (socketAddress instanceof InetSocketAddress) { + return ((InetSocketAddress) socketAddress).getAddress().getHostAddress(); + } + + return "Unknown"; } } diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java index ed7c93a..6a3112b 100644 --- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java @@ -19,7 +19,7 @@ package org.apache.rocketmq.remoting; import java.util.Random; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.RandomStringUtils; @@ -27,11 +27,17 @@ import org.apache.rocketmq.remoting.api.command.RemotingCommand; import org.apache.rocketmq.remoting.api.command.TrafficType; import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl; +import org.assertj.core.api.Fail; public class BaseTest { - protected void runInThreads(final Runnable runnable, int threadsNum) { - ExecutorService executor = Executors.newFixedThreadPool(threadsNum); - for (int i = 0; i < threadsNum; i++) { + protected void scheduleInThreads(final Runnable runnable, int periodMillis) { + final ScheduledExecutorService executor = ThreadUtils.newSingleThreadScheduledExecutor("UnitTests", true); + executor.scheduleAtFixedRate(runnable, 0, periodMillis, TimeUnit.MILLISECONDS); + } + + protected void runInThreads(final Runnable runnable, int concurrentNum) { + final ExecutorService executor = ThreadUtils.newFixedThreadPool(concurrentNum, 1000, "UnitTests", true); + for (int i = 0; i < concurrentNum; i++) { executor.submit(new Runnable() { @Override public void run() { @@ -39,8 +45,6 @@ public class BaseTest { } }); } - - ThreadUtils.shutdownGracefully(executor, 5, TimeUnit.SECONDS); } protected void runInThreads(final Runnable runnable, int threadsNum, @@ -84,4 +88,44 @@ public class BaseTest { return command; } + + protected <T> ObjectFuture<T> newObjectFuture(int permits, int timeoutMillis) { + return new ObjectFuture<>(permits, timeoutMillis); + } + + protected class ObjectFuture<T> { + private T object; + private Semaphore semaphore; + private int permits; + private int timeoutMillis; + + public ObjectFuture(int permits, int timeoutMillis) { + semaphore = new Semaphore(0); + this.permits = permits; + this.timeoutMillis = timeoutMillis; + } + + public void release() { + semaphore.release(); + } + + public void putObject(T object) { + this.object = object; + } + + public T getObject() { + try { + if (!semaphore.tryAcquire(permits, timeoutMillis, TimeUnit.MILLISECONDS)) { + Fail.fail("Get permits failed"); + } + } catch (InterruptedException e) { + Fail.fail("Get object failed", e); + } + return this.object; + } + } + + public class UnitTestException extends RuntimeException { + + } } diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/ResponseFutureTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/ResponseFutureTest.java index c8e1f25..1a16e56 100644 --- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/ResponseFutureTest.java +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/common/ResponseFutureTest.java @@ -21,6 +21,8 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.BaseTest; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.exception.RemoteAccessException; +import org.apache.rocketmq.remoting.api.exception.RemoteRuntimeException; import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl; import org.junit.Test; @@ -56,7 +58,7 @@ public class ResponseFutureTest extends BaseTest { public void executeAsyncHandler_Failure() { final RemotingCommand reqCommand = factory.createRequest(); final RemotingCommand resCommand = factory.createResponse(reqCommand); - final Throwable exception = new RuntimeException("Test Exception"); + final RemoteRuntimeException exception = new RemoteAccessException("Test Exception"); future = new ResponseFuture(1, 3000, new AsyncHandler() { @Override public void onFailure(final RemotingCommand request, final Throwable cause) { diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java new file mode 100644 index 0000000..5537579 --- /dev/null +++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstractTest.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; +import io.netty.channel.DefaultEventLoop; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.embedded.EmbeddedChannel; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.remoting.BaseTest; +import org.apache.rocketmq.remoting.api.AsyncHandler; +import org.apache.rocketmq.remoting.api.RequestProcessor; +import org.apache.rocketmq.remoting.api.channel.ChannelEventListener; +import org.apache.rocketmq.remoting.api.channel.RemotingChannel; +import org.apache.rocketmq.remoting.api.command.RemotingCommand; +import org.apache.rocketmq.remoting.api.exception.RemoteAccessException; +import org.apache.rocketmq.remoting.api.exception.RemoteTimeoutException; +import org.apache.rocketmq.remoting.config.RemotingClientConfig; +import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl; +import org.apache.rocketmq.remoting.impl.netty.handler.Decoder; +import org.apache.rocketmq.remoting.impl.netty.handler.Encoder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class NettyRemotingAbstractTest extends BaseTest { + + private NettyRemotingAbstract remotingAbstract; + + @Mock + private Channel mockedClientChannel; + + private EmbeddedChannel clientChannel; + + private EmbeddedChannel serverChannel; + + private RemotingCommand remotingRequest; + + private short requestCode = 123; + + @Before + public void setUp() { + remotingAbstract = new NettyRemotingAbstract(new RemotingClientConfig()) { + }; + + clientChannel = new EmbeddedChannel(new Encoder(), new Decoder(), new SimpleChannelInboundHandler<RemotingCommand>() { + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, final RemotingCommand msg) throws Exception { + remotingAbstract.processMessageReceived(ctx, msg); + } + }); + + serverChannel = new EmbeddedChannel(new Encoder(), new Decoder(), new SimpleChannelInboundHandler<RemotingCommand>() { + + @Override + protected void channelRead0(final ChannelHandlerContext ctx, final RemotingCommand msg) throws Exception { + remotingAbstract.processMessageReceived(ctx, msg); + } + }); + + remotingRequest = remotingAbstract.commandFactory().createRequest(); + remotingRequest.cmdCode(requestCode); + remotingRequest.payload("Ping".getBytes()); + + // Simulate the tcp stack + scheduleInThreads(new Runnable() { + @Override + public void run() { + ByteBuf msg = clientChannel.readOutbound(); + if (msg != null) { + serverChannel.writeInbound(msg); + } + + msg = serverChannel.readOutbound(); + + if (msg != null) { + clientChannel.writeInbound(msg); + } + } + }, 1); + + remotingAbstract.start(); + } + + @After + public void tearDown() { + remotingAbstract.stop(); + } + + @Test + public void putNettyEvent_Success() { + final Throwable exception = new RuntimeException(); + final ObjectFuture objectFuture = newObjectFuture(4, 100); + remotingAbstract.registerChannelEventListener(new ChannelEventListener() { + @Override + public void onChannelConnect(final RemotingChannel channel) { + if (((NettyChannelImpl) channel).getChannel() == mockedClientChannel) { + objectFuture.release(); + } + } + + @Override + public void onChannelClose(final RemotingChannel channel) { + if (((NettyChannelImpl) channel).getChannel() == mockedClientChannel) { + objectFuture.release(); + } + } + + @Override + public void onChannelException(final RemotingChannel channel, final Throwable cause) { + if (((NettyChannelImpl) channel).getChannel() == mockedClientChannel && exception == cause) { + objectFuture.release(); + } + } + + @Override + public void onChannelIdle(final RemotingChannel channel) { + if (((NettyChannelImpl) channel).getChannel() == mockedClientChannel) { + objectFuture.release(); + } + } + }); + + remotingAbstract.channelEventExecutor.start(); + + remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, mockedClientChannel)); + remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, mockedClientChannel)); + remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.IDLE, mockedClientChannel)); + remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, mockedClientChannel, exception)); + + objectFuture.getObject(); + } + + @Test + public void putNettyEvent_EventDropped() throws InterruptedException { + final Semaphore eventCount = new Semaphore(0); + final Semaphore droppedEvent = new Semaphore(0); + + remotingAbstract.registerChannelEventListener(new ChannelEventListener() { + @Override + public void onChannelConnect(final RemotingChannel channel) { + eventCount.release(); + } + + @Override + public void onChannelClose(final RemotingChannel channel) { + droppedEvent.release(); + } + + @Override + public void onChannelException(final RemotingChannel channel, final Throwable cause) { + + } + + @Override + public void onChannelIdle(final RemotingChannel channel) { + + } + }); + + int maxLimit = 10001; + + for (int i = 0; i < maxLimit; i++) { + remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CONNECT, mockedClientChannel)); + } + + // This event will be dropped + remotingAbstract.putNettyEvent(new NettyChannelEvent(NettyChannelEventType.CLOSE, mockedClientChannel)); + + remotingAbstract.channelEventExecutor.start(); + + assertThat(eventCount.tryAcquire(maxLimit, 1000, TimeUnit.MILLISECONDS)).isTrue(); + + assertThat(droppedEvent.tryAcquire(1, 10, TimeUnit.MILLISECONDS)).isFalse(); + } + + @Test + public void scanResponseTable_RemoveTimeoutRequest() throws InterruptedException { + final ObjectFuture<Throwable> objectFuture = newObjectFuture(1, 10); + + remotingAbstract.invokeAsyncWithInterceptor(new EmbeddedChannel(), + remotingAbstract.commandFactory().createRequest(), + new AsyncHandler() { + @Override + public void onFailure(final RemotingCommand request, final Throwable cause) { + objectFuture.putObject(cause); + objectFuture.release(); + } + + @Override + public void onSuccess(final RemotingCommand response) { + + } + }, 10); + + TimeUnit.MILLISECONDS.sleep(15); + remotingAbstract.scanResponseTable(); + + assertThat(objectFuture.getObject()).isInstanceOf(RemoteTimeoutException.class); + } + + @Test + public void invokeWithInterceptor_Success() { + registerNormalProcessor(); + + RemotingCommand response = remotingAbstract.invokeWithInterceptor(clientChannel, remotingRequest, 3000); + + assertThat(new String(response.payload())).isEqualTo("Pong"); + } + + @Test + public void invokeWithInterceptor_Timeout() { + registerTimeoutProcessor(20); + + try { + RemotingCommand response = remotingAbstract.invokeWithInterceptor(clientChannel, remotingRequest, 10); + failBecauseExceptionWasNotThrown(RemoteTimeoutException.class); + } catch (Exception e) { + assertThat(e).isInstanceOf(RemoteTimeoutException.class); + } + } + + @Test + public void invokeWithInterceptor_AccessException() { + ChannelPromise channelPromise = new DefaultChannelPromise(mockedClientChannel, new DefaultEventLoop()); + + when(mockedClientChannel.writeAndFlush(any(Object.class))).thenReturn(channelPromise); + channelPromise.setFailure(new UnitTestException()); + + try { + RemotingCommand response = remotingAbstract.invokeWithInterceptor(mockedClientChannel, remotingRequest, 10); + failBecauseExceptionWasNotThrown(RemoteAccessException.class); + } catch (Exception e) { + assertThat(e.getCause()).isInstanceOf(UnitTestException.class); + assertThat(e).isInstanceOf(RemoteAccessException.class); + } + } + + @Test + public void invokeAsyncWithInterceptor_Success() { + registerNormalProcessor(); + + final ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 10); + + remotingAbstract.invokeAsyncWithInterceptor(clientChannel, remotingRequest, new AsyncHandler() { + @Override + public void onFailure(final RemotingCommand request, final Throwable cause) { + + } + + @Override + public void onSuccess(final RemotingCommand response) { + objectFuture.putObject(response); + objectFuture.release(); + } + }, 3000); + + assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong"); + } + + @Test + public void invokeOnewayWithInterceptor_Success() { + ObjectFuture<RemotingCommand> objectFuture = newObjectFuture(1, 10); + registerOnewayProcessor(objectFuture); + + remotingAbstract.invokeOnewayWithInterceptor(clientChannel, remotingRequest); + + // Receive the response directly + assertThat(new String(objectFuture.getObject().payload())).isEqualTo("Pong"); + } + + @Test + public void registerInterceptor() { + } + + @Test + public void registerRequestProcessor() { + } + + @Test + public void registerRequestProcessor1() { + } + + @Test + public void unregisterRequestProcessor() { + } + + @Test + public void processor() { + } + + @Test + public void registerChannelEventListener() { + } + + private void registerTimeoutProcessor(final int timeoutMillis) { + remotingAbstract.registerRequestProcessor(requestCode, new RequestProcessor() { + @Override + public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) { + RemotingCommand response = remotingAbstract.commandFactory().createResponse(request); + response.payload("Pong".getBytes()); + try { + TimeUnit.MILLISECONDS.sleep(timeoutMillis); + } catch (InterruptedException ignore) { + } + return response; + } + }); + } + + private void registerNormalProcessor() { + remotingAbstract.registerRequestProcessor(requestCode, new RequestProcessor() { + @Override + public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) { + RemotingCommand response = remotingAbstract.commandFactory().createResponse(request); + response.payload("Pong".getBytes()); + return response; + } + }); + } + + private void registerOnewayProcessor(final ObjectFuture<RemotingCommand> objectFuture) { + remotingAbstract.registerRequestProcessor(requestCode, new RequestProcessor() { + @Override + public RemotingCommand processRequest(final RemotingChannel channel, final RemotingCommand request) { + RemotingCommand response = remotingAbstract.commandFactory().createResponse(request); + response.payload("Pong".getBytes()); + objectFuture.putObject(response); + objectFuture.release(); + return response; + } + }); + } +} \ No newline at end of file
