This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit e2069f54d781f81be1b850565e9ce63fd7b070b0 Author: yukon <[email protected]> AuthorDate: Thu May 16 14:37:21 2019 +0800 Reformat code --- remoting-core/remoting-api/pom.xml | 4 +- .../api/exception/NestedRuntimeException.java | 18 +- .../remoting/api/interceptor/ResponseContext.java | 10 +- remoting-core/remoting-impl/pom.xml | 4 +- .../rocketmq/remoting/external/ThreadUtils.java | 29 ++-- .../impl/buffer/NettyByteBufferWrapper.java | 12 +- .../remoting/impl/channel/FileRegionImpl.java | 10 +- .../remoting/impl/channel/NettyChannelImpl.java | 10 +- .../remoting/impl/command/CodecHelper.java | 6 +- .../remoting/impl/command/RemotingCommandImpl.java | 40 ++--- .../remoting/impl/netty/NettyRemotingAbstract.java | 166 +++++++++--------- .../remoting/impl/netty/NettyRemotingClient.java | 186 ++++++++++----------- .../remoting/impl/netty/NettyRemotingServer.java | 72 ++++---- .../rocketmq/remoting/internal/ByteUtils.java | 70 ++++---- .../rocketmq/remoting/internal/ExceptionUtils.java | 38 ++--- .../rocketmq/remoting/internal/UIDGenerator.java | 44 ++--- 16 files changed, 359 insertions(+), 360 deletions(-) diff --git a/remoting-core/remoting-api/pom.xml b/remoting-core/remoting-api/pom.xml index a5d1d0b..ad7676f 100644 --- a/remoting-core/remoting-api/pom.xml +++ b/remoting-core/remoting-api/pom.xml @@ -1,6 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>rocketmq-xxx</artifactId> diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java index 7ef01db..7179c91 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java @@ -51,6 +51,15 @@ public abstract class NestedRuntimeException extends RuntimeException { } /** + * Return the detail message, including the message from the nested exception + * if there is one. + */ + @Override + public String getMessage() { + return getMessageWithCause(super.getMessage(), getCause()); + } + + /** * Build a message for the given base message and root cause. * * @param message the base message @@ -71,15 +80,6 @@ public abstract class NestedRuntimeException extends RuntimeException { } /** - * Return the detail message, including the message from the nested exception - * if there is one. - */ - @Override - public String getMessage() { - return getMessageWithCause(super.getMessage(), getCause()); - } - - /** * Retrieve the innermost cause of this exception, if any. * * @return the innermost exception, or {@code null} if none diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java index 97ec2e6..c7f7a9b 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/interceptor/ResponseContext.java @@ -58,6 +58,11 @@ public class ResponseContext extends RequestContext { this.request = request; } + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); + } + public RemotingCommand getResponse() { return response; } @@ -65,9 +70,4 @@ public class ResponseContext extends RequestContext { public void setResponse(RemotingCommand response) { this.response = response; } - - @Override - public String toString() { - return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE); - } } diff --git a/remoting-core/remoting-impl/pom.xml b/remoting-core/remoting-impl/pom.xml index da1b8df..e4887cd 100644 --- a/remoting-core/remoting-impl/pom.xml +++ b/remoting-core/remoting-impl/pom.xml @@ -1,6 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" +<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>rocketmq-xxx</artifactId> diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java index 5a50089..a4a7487 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/external/ThreadUtils.java @@ -49,7 +49,21 @@ public final class ThreadUtils { return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, newGenericThreadFactory(processName, isDaemon)); } - public static ExecutorService newFixedThreadPool(int nThreads, int workQueueCapacity, String processName, boolean isDaemon) { + public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) { + return new ThreadFactory() { + private AtomicInteger threadIndex = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet())); + thread.setDaemon(isDaemon); + return thread; + } + }; + } + + public static ExecutorService newFixedThreadPool(int nThreads, int workQueueCapacity, String processName, + boolean isDaemon) { return new ThreadPoolExecutor( nThreads, nThreads, @@ -80,19 +94,6 @@ public final class ThreadUtils { return newGenericThreadFactory(processName, threads, false); } - public static ThreadFactory newGenericThreadFactory(final String processName, final boolean isDaemon) { - return new ThreadFactory() { - private AtomicInteger threadIndex = new AtomicInteger(0); - - @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r, String.format("%s_%d", processName, this.threadIndex.incrementAndGet())); - thread.setDaemon(isDaemon); - return thread; - } - }; - } - public static ThreadFactory newGenericThreadFactory(final String processName, final int threads, final boolean isDaemon) { return new ThreadFactory() { diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java index 5a71452..d4fc15c 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/buffer/NettyByteBufferWrapper.java @@ -49,13 +49,13 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper { } @Override - public void writeShort(final short value) { - buffer.writeShort(value); + public void writeInt(int data) { + buffer.writeInt(data); } @Override - public void writeInt(int data) { - buffer.writeInt(data); + public void writeShort(final short value) { + buffer.writeShort(value); } @Override @@ -69,12 +69,12 @@ public class NettyByteBufferWrapper implements ByteBufferWrapper { } @Override - public void readBytes(final ByteBuffer dst) { + public void readBytes(byte[] dst) { buffer.readBytes(dst); } @Override - public void readBytes(byte[] dst) { + public void readBytes(final ByteBuffer dst) { buffer.readBytes(dst); } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java index b90afc1..c7640ce 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/FileRegionImpl.java @@ -56,11 +56,6 @@ public class FileRegionImpl extends AbstractReferenceCounted implements FileRegi } @Override - protected void deallocate() { - chunkRegion.release(); - } - - @Override public FileRegion retain() { super.retain(); return this; @@ -78,6 +73,11 @@ public class FileRegionImpl extends AbstractReferenceCounted implements FileRegi } @Override + protected void deallocate() { + chunkRegion.release(); + } + + @Override public FileRegion touch(Object hint) { return this; } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java index ba4a969..4427cd7 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/channel/NettyChannelImpl.java @@ -70,6 +70,11 @@ public class NettyChannelImpl implements RemotingChannel { } @Override + public int hashCode() { + return channel != null ? channel.hashCode() : 0; + } + + @Override public boolean equals(final Object o) { if (this == o) return true; @@ -83,11 +88,6 @@ public class NettyChannelImpl implements RemotingChannel { } @Override - public int hashCode() { - return channel != null ? channel.hashCode() : 0; - } - - @Override public String toString() { return "NettyChannelImpl [channel=" + channel + "]"; } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java index bfc536b..df88504 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/CodecHelper.java @@ -28,10 +28,9 @@ public class CodecHelper { // ProtocolMagic(1) + TotalLength(4) + CmdCode(2) + CmdVersion(2) + RequestID(4) + TrafficType(1) + OpCode(2) // + RemarkLen(2) + PropertiesSize(2) + PayloadLen(4); public final static int MIN_PROTOCOL_LEN = 1 + 4 + 2 + 2 + 4 + 1 + 2 + 2 + 2 + 4; + public final static byte PROTOCOL_MAGIC = 0x14; private final static char PROPERTY_SEPARATOR = '\n'; private final static Charset REMOTING_CHARSET = Charset.forName("UTF-8"); - - public final static byte PROTOCOL_MAGIC = 0x14; private final static int REMARK_MAX_LEN = Short.MAX_VALUE; private final static int PROPERTY_MAX_LEN = 524288; // 512KB private final static int PAYLOAD_MAX_LEN = 16777216; // 16MB @@ -41,7 +40,7 @@ public class CodecHelper { out.writeByte(PROTOCOL_MAGIC); short remarkLen = 0; - byte [] remark = null; + byte[] remark = null; if (command.remark() != null) { remark = command.remark().getBytes(REMOTING_CHARSET); if (remark.length > REMARK_MAX_LEN) { @@ -124,7 +123,6 @@ public class CodecHelper { cmd.trafficType(TrafficType.parse(in.readByte())); cmd.opCode(in.readShort()); - short remarkLen = in.readShort(); if (remarkLen > 0) { byte[] bytes = new byte[remarkLen]; diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java index b405eda..3a67ead 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/command/RemotingCommandImpl.java @@ -42,6 +42,26 @@ public class RemotingCommandImpl implements RemotingCommand { } @Override + public short cmdCode() { + return this.cmdCode; + } + + @Override + public void cmdCode(short code) { + this.cmdCode = code; + } + + @Override + public short cmdVersion() { + return this.cmdVersion; + } + + @Override + public void cmdVersion(short version) { + this.cmdVersion = version; + } + + @Override public int requestID() { return requestId; } @@ -102,26 +122,6 @@ public class RemotingCommandImpl implements RemotingCommand { } @Override - public short cmdCode() { - return this.cmdCode; - } - - @Override - public void cmdCode(short code) { - this.cmdCode = code; - } - - @Override - public short cmdVersion() { - return this.cmdVersion; - } - - @Override - public void cmdVersion(short version) { - this.cmdVersion = version; - } - - @Override public byte[] payload() { return this.payload; } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index d54c71a..b351445 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -100,6 +100,31 @@ public abstract class NettyRemotingAbstract implements RemotingService { }, 3000, 1000, TimeUnit.MICROSECONDS); } + void scanResponseTable() { + /* + Iterator<Map.Entry<Integer, ResponseResult>> iterator = this.ackTables.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Integer, ResponseResult> next = iterator.next(); + ResponseResult result = next.getValue(); + + if ((result.getBeginTimestamp() + result.getTimeoutMillis()) <= System.currentTimeMillis()) { + iterator.remove(); + try { + long timeoutMillis = result.getTimeoutMillis(); + long costTimeMillis = System.currentTimeMillis() - result.getBeginTimestamp(); + result.onTimeout(timeoutMillis, costTimeMillis); + LOG.error("scan response table command {} failed", result.getRequestId()); + } catch (Throwable e) { + LOG.warn("Error occurred when execute timeout callback !", e); + } finally { + result.release(); + LOG.warn("Removed timeout request {} ", result); + } + } + } + */ + } + @Override public void start() { if (this.channelEventListenerGroup.size() > 0) { @@ -158,52 +183,6 @@ public abstract class NettyRemotingAbstract implements RemotingService { } } - @NotNull - private Runnable buildProcessorTask(final ChannelHandlerContext ctx, final RemotingCommand cmd, - final Pair<RequestProcessor, ExecutorService> processorExecutorPair, final RemotingChannel channel) { - return new Runnable() { - @Override - public void run() { - try { - interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.RESPONSE, - extractRemoteAddress(ctx.channel()), cmd)); - - RemotingCommand response = processorExecutorPair.getLeft().processRequest(channel, cmd); - - interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.RESPONSE, - extractRemoteAddress(ctx.channel()), cmd, response)); - - handleResponse(response, cmd, ctx); - } catch (Throwable e) { - LOG.error(String.format("Process request %s error !", cmd.toString()), e); - - handleException(e, cmd, ctx); - } - } - }; - } - - private void handleException(Throwable e, RemotingCommand cmd, ChannelHandlerContext ctx) { - if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { - //FiXME Exception interceptor can not throw exception - interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, extractRemoteAddress(ctx.channel()), cmd, e, "")); - } - } - - private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) { - if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { - if (response != null) { - try { - writeAndFlush(ctx.channel(), response); - } catch (Throwable e) { - LOG.error(String.format("Process request %s success, but transfer response %s failed !", - cmd.toString(), response.toString()), e); - } - } - } - - } - private void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final ResponseResult responseResult = ackTables.get(cmd.requestID()); if (responseResult != null) { @@ -254,8 +233,33 @@ public abstract class NettyRemotingAbstract implements RemotingService { } } - private void writeAndFlush(final Channel channel, final Object msg, final ChannelFutureListener listener) { - channel.writeAndFlush(msg).addListener(listener); + @NotNull + private Runnable buildProcessorTask(final ChannelHandlerContext ctx, final RemotingCommand cmd, + final Pair<RequestProcessor, ExecutorService> processorExecutorPair, final RemotingChannel channel) { + return new Runnable() { + @Override + public void run() { + try { + interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.RESPONSE, + extractRemoteAddress(ctx.channel()), cmd)); + + RemotingCommand response = processorExecutorPair.getLeft().processRequest(channel, cmd); + + interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.RESPONSE, + extractRemoteAddress(ctx.channel()), cmd, response)); + + handleResponse(response, cmd, ctx); + } catch (Throwable e) { + LOG.error(String.format("Process request %s error !", cmd.toString()), e); + + handleException(e, cmd, ctx); + } + } + }; + } + + protected String extractRemoteAddress(Channel channel) { + return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress(); } private void writeAndFlush(final Channel channel, final Object msg) { @@ -266,29 +270,25 @@ public abstract class NettyRemotingAbstract implements RemotingService { return this.publicExecutor; } - void scanResponseTable() { - /* - Iterator<Map.Entry<Integer, ResponseResult>> iterator = this.ackTables.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<Integer, ResponseResult> next = iterator.next(); - ResponseResult result = next.getValue(); - - if ((result.getBeginTimestamp() + result.getTimeoutMillis()) <= System.currentTimeMillis()) { - iterator.remove(); + private void handleResponse(RemotingCommand response, RemotingCommand cmd, ChannelHandlerContext ctx) { + if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { + if (response != null) { try { - long timeoutMillis = result.getTimeoutMillis(); - long costTimeMillis = System.currentTimeMillis() - result.getBeginTimestamp(); - result.onTimeout(timeoutMillis, costTimeMillis); - LOG.error("scan response table command {} failed", result.getRequestId()); + writeAndFlush(ctx.channel(), response); } catch (Throwable e) { - LOG.warn("Error occurred when execute timeout callback !", e); - } finally { - result.release(); - LOG.warn("Removed timeout request {} ", result); + LOG.error(String.format("Process request %s success, but transfer response %s failed !", + cmd.toString(), response.toString()), e); } } } - */ + + } + + private void handleException(Throwable e, RemotingCommand cmd, ChannelHandlerContext ctx) { + if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { + //FiXME Exception interceptor can not throw exception + interceptorGroup.onException(new ExceptionContext(RemotingEndPoint.RESPONSE, extractRemoteAddress(ctx.channel()), cmd, e, "")); + } } public RemotingCommand invokeWithInterceptor(final Channel channel, final RemotingCommand request, @@ -358,6 +358,10 @@ public abstract class NettyRemotingAbstract implements RemotingService { } } + private void writeAndFlush(final Channel channel, final Object msg, final ChannelFutureListener listener) { + channel.writeAndFlush(msg).addListener(listener); + } + public void invokeAsyncWithInterceptor(final Channel channel, final RemotingCommand request, final AsyncHandler invokeCallback, long timeoutMillis) { request.trafficType(TrafficType.REQUEST_ASYNC); @@ -486,13 +490,9 @@ public abstract class NettyRemotingAbstract implements RemotingService { } } - public String getRemotingInstanceId() { - return remotingInstanceId; - } - @Override - public RemotingCommandFactory commandFactory() { - return this.remotingCommandFactory; + public void registerInterceptor(Interceptor interceptor) { + this.interceptorGroup.registerInterceptor(interceptor); } @Override @@ -514,27 +514,27 @@ public abstract class NettyRemotingAbstract implements RemotingService { } @Override - public String remotingInstanceId() { - return this.getRemotingInstanceId(); + public Pair<RequestProcessor, ExecutorService> processor(short requestCode) { + return processorTables.get(requestCode); } @Override - public void registerInterceptor(Interceptor interceptor) { - this.interceptorGroup.registerInterceptor(interceptor); + public String remotingInstanceId() { + return this.getRemotingInstanceId(); } - @Override - public void registerChannelEventListener(ChannelEventListener listener) { - this.channelEventListenerGroup.registerChannelEventListener(listener); + public String getRemotingInstanceId() { + return remotingInstanceId; } @Override - public Pair<RequestProcessor, ExecutorService> processor(short requestCode) { - return processorTables.get(requestCode); + public RemotingCommandFactory commandFactory() { + return this.remotingCommandFactory; } - protected String extractRemoteAddress(Channel channel) { - return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress(); + @Override + public void registerChannelEventListener(ChannelEventListener listener) { + this.channelEventListenerGroup.registerChannelEventListener(listener); } class ChannelEventExecutor extends Thread { diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java index faead7f..3dab3db 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java @@ -88,6 +88,25 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads())); } + @Override + public void start() { + super.start(); + + this.clientBootstrap.group(this.ioGroup).channel(socketChannelClass) + .handler(new ChannelInitializer<SocketChannel>() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(workerGroup, new Decoder(), new Encoder(), new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(), + clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()), + new ClientConnectionHandler(), new EventDispatcher(), new ExceptionHandler()); + } + }); + + applyOptions(clientBootstrap); + + startUpHouseKeepingService(); + } + private void applyOptions(Bootstrap bootstrap) { if (null != clientConfig) { if (clientConfig.getTcpSoLinger() > 0) { @@ -111,25 +130,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } @Override - public void start() { - super.start(); - - this.clientBootstrap.group(this.ioGroup).channel(socketChannelClass) - .handler(new ChannelInitializer<SocketChannel>() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast(workerGroup, new Decoder(), new Encoder(), new IdleStateHandler(clientConfig.getConnectionChannelReaderIdleSeconds(), - clientConfig.getConnectionChannelWriterIdleSeconds(), clientConfig.getConnectionChannelIdleSeconds()), - new ClientConnectionHandler(), new EventDispatcher(), new ExceptionHandler()); - } - }); - - applyOptions(clientBootstrap); - - startUpHouseKeepingService(); - } - - @Override public void stop() { // try { ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS); @@ -200,74 +200,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } - private Channel createIfAbsent(final String addr) { - ChannelWrapper cw = this.channelTables.get(addr); - if (cw != null && cw.isActive()) { - return cw.getChannel(); - } - return this.createChannel(addr); - } - - //FIXME need test to verify - private Channel createChannel(final String addr) { - ChannelWrapper cw = null; - try { - if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - boolean createNewConnection; - cw = this.channelTables.get(addr); - if (cw != null) { - if (cw.isActive()) { - return cw.getChannel(); - } else if (!cw.getChannelFuture().isDone()) { - createNewConnection = false; - } else { - this.channelTables.remove(addr); - createNewConnection = true; - } - } else { - createNewConnection = true; - } - - if (createNewConnection) { - String[] s = addr.split(":"); - SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1])); - ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress); - LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr); - cw = new ChannelWrapper(channelFuture); - this.channelTables.put(addr, cw); - } - } catch (Exception e) { - LOG.error("createChannel: create channel exception", e); - } finally { - this.lockChannelTables.unlock(); - } - } else { - LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - - if (cw != null) { - ChannelFuture channelFuture = cw.getChannelFuture(); - if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) { - if (cw.isActive()) { - LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); - return cw.getChannel(); - } else { - LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause()); - this.closeChannel(addr, cw.getChannel()); - } - } else { - LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(), - channelFuture.toString()); - this.closeChannel(addr, cw.getChannel()); - } - } - return null; - } - private void closeChannel(final Channel channel) { if (null == channel) return; @@ -344,6 +276,74 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } + private Channel createIfAbsent(final String addr) { + ChannelWrapper cw = this.channelTables.get(addr); + if (cw != null && cw.isActive()) { + return cw.getChannel(); + } + return this.createChannel(addr); + } + + //FIXME need test to verify + private Channel createChannel(final String addr) { + ChannelWrapper cw = null; + try { + if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + boolean createNewConnection; + cw = this.channelTables.get(addr); + if (cw != null) { + if (cw.isActive()) { + return cw.getChannel(); + } else if (!cw.getChannelFuture().isDone()) { + createNewConnection = false; + } else { + this.channelTables.remove(addr); + createNewConnection = true; + } + } else { + createNewConnection = true; + } + + if (createNewConnection) { + String[] s = addr.split(":"); + SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1])); + ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress); + LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr); + cw = new ChannelWrapper(channelFuture); + this.channelTables.put(addr, cw); + } + } catch (Exception e) { + LOG.error("createChannel: create channel exception", e); + } finally { + this.lockChannelTables.unlock(); + } + } else { + LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + + if (cw != null) { + ChannelFuture channelFuture = cw.getChannelFuture(); + if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) { + if (cw.isActive()) { + LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); + return cw.getChannel(); + } else { + LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause()); + this.closeChannel(addr, cw.getChannel()); + } + } else { + LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(), + channelFuture.toString()); + this.closeChannel(addr, cw.getChannel()); + } + } + return null; + } + @Override public void invokeAsync(final String address, final RemotingCommand request, final AsyncHandler asyncHandler, final long timeoutMillis) { @@ -403,12 +403,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private class ClientConnectionHandler extends ChannelDuplexHandler { @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(), - ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable()); - } - - @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { @@ -455,6 +449,12 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(), + ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable()); + } + + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LOG.info("Close channel {} because of error {} ", ctx.channel(), cause); diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java index 0d6a2cc..ec8a243 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java @@ -94,36 +94,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads())); } - private void applyOptions(ServerBootstrap bootstrap) { - //option() is for the NioServerSocketChannel that accepts incoming connections. - //childOption() is for the Channels accepted by the parent ServerChannel, which is NioServerSocketChannel in this case - if (null != serverConfig) { - if (serverConfig.getTcpSoBacklogSize() > 0) { - bootstrap.option(ChannelOption.SO_BACKLOG, serverConfig.getTcpSoBacklogSize()); - } - - if (serverConfig.getTcpSoLinger() > 0) { - bootstrap.option(ChannelOption.SO_LINGER, serverConfig.getTcpSoLinger()); - } - - if (serverConfig.getTcpSoSndBufSize() > 0) { - bootstrap.childOption(ChannelOption.SO_SNDBUF, serverConfig.getTcpSoSndBufSize()); - } - if (serverConfig.getTcpSoRcvBufSize() > 0) { - bootstrap.childOption(ChannelOption.SO_RCVBUF, serverConfig.getTcpSoRcvBufSize()); - } - - bootstrap.option(ChannelOption.SO_REUSEADDR, serverConfig.isTcpSoReuseAddress()). - childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isTcpSoKeepAlive()). - childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpSoNoDelay()). - option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serverConfig.getTcpSoTimeout()); - } - - if (serverConfig.isServerPooledBytebufAllocatorEnable()) { - bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); - } - } - @Override public void start() { super.start(); @@ -157,6 +127,36 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti startUpHouseKeepingService(); } + private void applyOptions(ServerBootstrap bootstrap) { + //option() is for the NioServerSocketChannel that accepts incoming connections. + //childOption() is for the Channels accepted by the parent ServerChannel, which is NioServerSocketChannel in this case + if (null != serverConfig) { + if (serverConfig.getTcpSoBacklogSize() > 0) { + bootstrap.option(ChannelOption.SO_BACKLOG, serverConfig.getTcpSoBacklogSize()); + } + + if (serverConfig.getTcpSoLinger() > 0) { + bootstrap.option(ChannelOption.SO_LINGER, serverConfig.getTcpSoLinger()); + } + + if (serverConfig.getTcpSoSndBufSize() > 0) { + bootstrap.childOption(ChannelOption.SO_SNDBUF, serverConfig.getTcpSoSndBufSize()); + } + if (serverConfig.getTcpSoRcvBufSize() > 0) { + bootstrap.childOption(ChannelOption.SO_RCVBUF, serverConfig.getTcpSoRcvBufSize()); + } + + bootstrap.option(ChannelOption.SO_REUSEADDR, serverConfig.isTcpSoReuseAddress()). + childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isTcpSoKeepAlive()). + childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpSoNoDelay()). + option(ChannelOption.CONNECT_TIMEOUT_MILLIS, serverConfig.getTcpSoTimeout()); + } + + if (serverConfig.isServerPooledBytebufAllocatorEnable()) { + bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + } + } + @Override public void stop() { try { @@ -203,12 +203,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti private class ServerConnectionHandler extends ChannelDuplexHandler { @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(), - ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable()); - } - - @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); } @@ -249,6 +243,12 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } @Override + public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { + LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(), + ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable()); + } + + @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel(), cause)); diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ByteUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ByteUtils.java index c298ce7..0688dfa 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ByteUtils.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ByteUtils.java @@ -34,47 +34,47 @@ public final class ByteUtils { } /** - * Compare two byte arrays (perform null checks beforehand). + * Compare two two-dimensional byte arrays. No null checks are performed. * * @param left the first byte array * @param right the second byte array * @return the result of the comparison */ - public static boolean equals(byte[] left, byte[] right) { - if (left == null) { - return right == null; - } - if (right == null) { - return false; - } - + public static boolean equals(byte[][] left, byte[][] right) { if (left.length != right.length) { return false; } + boolean result = true; for (int i = left.length - 1; i >= 0; i--) { - result &= left[i] == right[i]; + result &= ByteUtils.equals(left[i], right[i]); } + return result; } /** - * Compare two two-dimensional byte arrays. No null checks are performed. + * Compare two byte arrays (perform null checks beforehand). * * @param left the first byte array * @param right the second byte array * @return the result of the comparison */ - public static boolean equals(byte[][] left, byte[][] right) { - if (left.length != right.length) { + public static boolean equals(byte[] left, byte[] right) { + if (left == null) { + return right == null; + } + if (right == null) { return false; } + if (left.length != right.length) { + return false; + } boolean result = true; for (int i = left.length - 1; i >= 0; i--) { - result &= ByteUtils.equals(left[i], right[i]); + result &= left[i] == right[i]; } - return result; } @@ -104,16 +104,16 @@ public final class ByteUtils { } /** - * Computes a hashcode based on the contents of a one-dimensional byte array - * rather than its identity. + * Computes a hashcode based on the contents of a three-dimensional byte + * array rather than its identity. * * @param array the array to compute the hashcode of * @return the hashcode */ - public static int deepHashCode(byte[] array) { + public static int deepHashCode(byte[][][] array) { int result = 1; for (int i = 0; i < array.length; i++) { - result = 31 * result + array[i]; + result = 31 * result + deepHashCode(array[i]); } return result; } @@ -134,16 +134,16 @@ public final class ByteUtils { } /** - * Computes a hashcode based on the contents of a three-dimensional byte - * array rather than its identity. + * Computes a hashcode based on the contents of a one-dimensional byte array + * rather than its identity. * * @param array the array to compute the hashcode of * @return the hashcode */ - public static int deepHashCode(byte[][][] array) { + public static int deepHashCode(byte[] array) { int result = 1; for (int i = 0; i < array.length; i++) { - result = 31 * result + deepHashCode(array[i]); + result = 31 * result + array[i]; } return result; } @@ -281,7 +281,7 @@ public final class ByteUtils { * * @param x1 the first array * @param x2 the second array - * @return (x2||x1) (little-endian order, i.e. x1 is at lower memory + * @return (x2 | | x1) (little-endian order, i.e. x1 is at lower memory * addresses) */ public static byte[] concatenate(byte[] x1, byte[] x2) { @@ -339,14 +339,11 @@ public final class ByteUtils { * * @param input the input byte array * @param start the start index - * @param end the end index - * @return a subarray of <tt>input</tt>, ranging from <tt>start</tt> - * (inclusively) to <tt>end</tt> (exclusively) + * @return a subarray of <tt>input</tt>, ranging from <tt>start</tt> to + * the end of the array */ - public static byte[] subArray(byte[] input, int start, int end) { - byte[] result = new byte[end - start]; - System.arraycopy(input, start, result, 0, end - start); - return result; + public static byte[] subArray(byte[] input, int start) { + return subArray(input, start, input.length); } /** @@ -354,11 +351,14 @@ public final class ByteUtils { * * @param input the input byte array * @param start the start index - * @return a subarray of <tt>input</tt>, ranging from <tt>start</tt> to - * the end of the array + * @param end the end index + * @return a subarray of <tt>input</tt>, ranging from <tt>start</tt> + * (inclusively) to <tt>end</tt> (exclusively) */ - public static byte[] subArray(byte[] input, int start) { - return subArray(input, start, input.length); + public static byte[] subArray(byte[] input, int start, int end) { + byte[] result = new byte[end - start]; + System.arraycopy(input, start, result, 0, end - start); + return result; } /** diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ExceptionUtils.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ExceptionUtils.java index 6386ca0..dc17d05 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ExceptionUtils.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/ExceptionUtils.java @@ -28,25 +28,6 @@ public class ExceptionUtils { private static final String LINE_SEPARATOR = System.getProperty("line.separator"); /** - * <p>Gets the stack trace from a Throwable as a String.</p> - * - * <p>The result of this method vary by JDK version as this method - * uses {@link Throwable#printStackTrace(java.io.PrintWriter)}. - * On JDK1.3 and earlier, the cause exception will not be shown - * unless the specified throwable alters printStackTrace.</p> - * - * @param throwable the <code>Throwable</code> to be examined - * @return the stack trace as generated by the exception's - * <code>printStackTrace(PrintWriter)</code> method - */ - public static String getStackTrace(final Throwable throwable) { - final StringWriter sw = new StringWriter(); - final PrintWriter pw = new PrintWriter(sw, true); - throwable.printStackTrace(pw); - return sw.getBuffer().toString(); - } - - /** * <p>Produces a <code>List</code> of stack frames - the message * is not included. Only the trace of the specified exception is * returned, any caused by trace is stripped.</p> @@ -77,4 +58,23 @@ public class ExceptionUtils { } return list; } + + /** + * <p>Gets the stack trace from a Throwable as a String.</p> + * + * <p>The result of this method vary by JDK version as this method + * uses {@link Throwable#printStackTrace(java.io.PrintWriter)}. + * On JDK1.3 and earlier, the cause exception will not be shown + * unless the specified throwable alters printStackTrace.</p> + * + * @param throwable the <code>Throwable</code> to be examined + * @return the stack trace as generated by the exception's + * <code>printStackTrace(PrintWriter)</code> method + */ + public static String getStackTrace(final Throwable throwable) { + final StringWriter sw = new StringWriter(); + final PrintWriter pw = new PrintWriter(sw, true); + throwable.printStackTrace(pw); + return sw.getBuffer().toString(); + } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java index a4b1293..b64ab10 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java @@ -55,21 +55,13 @@ public class UIDGenerator { counter = 0; } - public static UIDGenerator instance() { - return generatorLocal.get(); - } - - public String createUID() { - long current = System.currentTimeMillis(); - if (current >= nextStartTime) { - setStartTime(current); - } - buffer.position(0); - sb.setLength(basePos); - buffer.putInt((int) (System.currentTimeMillis() - startTime)); - buffer.putShort(counter++); - sb.append(ByteUtils.toHexString(buffer.array())); - return sb.toString(); + public byte[] createFakeIP() { + ByteBuffer bb = ByteBuffer.allocate(8); + bb.putLong(System.currentTimeMillis()); + bb.position(4); + byte[] fakeIP = new byte[4]; + bb.get(fakeIP); + return fakeIP; } private void setStartTime(long millis) { @@ -85,13 +77,21 @@ public class UIDGenerator { nextStartTime = cal.getTimeInMillis(); } - public byte[] createFakeIP() { - ByteBuffer bb = ByteBuffer.allocate(8); - bb.putLong(System.currentTimeMillis()); - bb.position(4); - byte[] fakeIP = new byte[4]; - bb.get(fakeIP); - return fakeIP; + public static UIDGenerator instance() { + return generatorLocal.get(); + } + + public String createUID() { + long current = System.currentTimeMillis(); + if (current >= nextStartTime) { + setStartTime(current); + } + buffer.position(0); + sb.setLength(basePos); + buffer.putInt((int) (System.currentTimeMillis() - startTime)); + buffer.putShort(counter++); + sb.append(ByteUtils.toHexString(buffer.array())); + return sb.toString(); } }
