This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit ca20b84a67c49c80baa2296b70f7df6af3913d4b Author: yukon <[email protected]> AuthorDate: Fri May 17 15:38:18 2019 +0800 Reformat code --- README.md | 4 +- .../rocketmq/remoting/api/RemotingServer.java | 3 +- .../api/exception/NestedRuntimeException.java | 18 ++--- .../rocketmq/remoting/common/TypePresentation.java | 73 ------------------ .../rocketmq/remoting/common/ResponseFuture.java | 3 +- .../remoting/impl/netty/NettyRemotingAbstract.java | 13 ++-- .../remoting/impl/netty/NettyRemotingClient.java | 88 +++++++++++----------- .../remoting/impl/netty/NettyRemotingServer.java | 38 +++++----- .../rocketmq/remoting/internal/UIDGenerator.java | 8 +- 9 files changed, 88 insertions(+), 160 deletions(-) diff --git a/README.md b/README.md index 2b8325e..6d4c603 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ -## Apache RocketMQ XXX +## Apache RocketMQ X -Apache RocketMQ XXX is the next generation of RocketMQ, which is extremely simple and high available. \ No newline at end of file +Apache RocketMQ X is the next generation of RocketMQ, which is extremely simple and high available. \ No newline at end of file diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java index 785f83e..11b87bd 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/RemotingServer.java @@ -25,7 +25,8 @@ public interface RemotingServer extends RemotingService { RemotingCommand invoke(RemotingChannel remotingChannel, RemotingCommand request, long timeoutMillis); - void invokeAsync(RemotingChannel remotingChannel, RemotingCommand request, AsyncHandler asyncHandler, long timeoutMillis); + void invokeAsync(RemotingChannel remotingChannel, RemotingCommand request, AsyncHandler asyncHandler, + long timeoutMillis); void invokeOneWay(RemotingChannel remotingChannel, RemotingCommand request); } diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java index 7179c91..7ef01db 100644 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java +++ b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/api/exception/NestedRuntimeException.java @@ -51,15 +51,6 @@ public abstract class NestedRuntimeException extends RuntimeException { } /** - * Return the detail message, including the message from the nested exception - * if there is one. - */ - @Override - public String getMessage() { - return getMessageWithCause(super.getMessage(), getCause()); - } - - /** * Build a message for the given base message and root cause. * * @param message the base message @@ -80,6 +71,15 @@ public abstract class NestedRuntimeException extends RuntimeException { } /** + * Return the detail message, including the message from the nested exception + * if there is one. + */ + @Override + public String getMessage() { + return getMessageWithCause(super.getMessage(), getCause()); + } + + /** * Retrieve the innermost cause of this exception, if any. * * @return the innermost exception, or {@code null} if none diff --git a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/TypePresentation.java b/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/TypePresentation.java deleted file mode 100644 index ef3d5f8..0000000 --- a/remoting-core/remoting-api/src/main/java/org/apache/rocketmq/remoting/common/TypePresentation.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.rocketmq.remoting.common; - -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Represents a generic type {@code T}. Java doesn't yet provide a way to - * represent generic types, so this class does. Forces clients to create a - * subclass of this class which enables retrieval the type information even at - * runtime. - * - * <p>For example, to create a type literal for {@code List<String>}, you can - * create an empty anonymous inner class: - * - * <pre> - * TypePresentation<List<String>> list = new TypePresentation<List<String>>() {}; - * </pre> - * - * To create a type literal for {@code Map<String, Integer>}: - * - * <pre> - * TypePresentation<Map<String, Integer>> map = new TypePresentation<Map<String, Integer>>() {}; - * </pre> - * - * This syntax cannot be used to create type literals that have wildcard - * parameters, such as {@code Class<?>} or {@code List<? extends CharSequence>}. - * - * @since 1.0.0 - */ -public class TypePresentation<T> { - static ConcurrentMap<Class<?>, ConcurrentMap<Type, ConcurrentMap<Type, Type>>> classTypeCache - = new ConcurrentHashMap<Class<?>, ConcurrentMap<Type, ConcurrentMap<Type, Type>>>(16, 0.75f, 1); - protected final Type type; - - /** - * Constructs a new type literal. Derives represented class from type - * parameter. - * - * <p>Clients create an empty anonymous subclass. Doing so embeds the type - * parameter in the anonymous class's type hierarchy so we can reconstitute it - * at runtime despite erasure. - */ - protected TypePresentation() { - Type superClass = getClass().getGenericSuperclass(); - type = ((ParameterizedType) superClass).getActualTypeArguments()[0]; - } - - /** - * @return underlying {@code Type} instance. - */ - public Type getType() { - return type; - } -} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java index e614963..014dd78 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/ResponseFuture.java @@ -45,7 +45,8 @@ public class ResponseFuture { private InterceptorGroup interceptorGroup; private String remoteAddr; - public ResponseFuture(int requestId, long timeoutMillis, AsyncHandler asyncHandler, @Nullable SemaphoreReleaseOnlyOnce once) { + public ResponseFuture(int requestId, long timeoutMillis, AsyncHandler asyncHandler, + @Nullable SemaphoreReleaseOnlyOnce once) { this.requestId = requestId; this.timeoutMillis = timeoutMillis; this.asyncHandler = asyncHandler; diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index ac989f8..4c72b78 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -111,7 +111,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { } } - for (Integer requestID: rList) { + for (Integer requestID : rList) { ResponseFuture rf = this.ackTables.remove(requestID); if (rf != null) { @@ -360,8 +360,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseFuture.getCause()); - } - else { + } else { throw new RemoteAccessException(extractRemoteAddress(channel), responseFuture.getCause()); } } @@ -494,15 +493,15 @@ public abstract class NettyRemotingAbstract implements RemotingService { return this.getRemotingInstanceId(); } - public String getRemotingInstanceId() { - return remotingInstanceId; - } - @Override public RemotingCommandFactory commandFactory() { return this.remotingCommandFactory; } + public String getRemotingInstanceId() { + return remotingInstanceId; + } + @Override public void registerChannelEventListener(ChannelEventListener listener) { this.channelEventListenerGroup.registerChannelEventListener(listener); diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java index f098de3..44788c3 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java @@ -112,28 +112,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti startUpHouseKeepingService(); } - private void applyOptions(Bootstrap bootstrap) { - if (null != clientConfig) { - if (clientConfig.getTcpSoLinger() > 0) { - bootstrap.option(ChannelOption.SO_LINGER, clientConfig.getTcpSoLinger()); - } - - if (clientConfig.getTcpSoSndBufSize() > 0) { - bootstrap.option(ChannelOption.SO_SNDBUF, clientConfig.getTcpSoSndBufSize()); - } - if (clientConfig.getTcpSoRcvBufSize() > 0) { - bootstrap.option(ChannelOption.SO_RCVBUF, clientConfig.getTcpSoRcvBufSize()); - } - - bootstrap.option(ChannelOption.SO_REUSEADDR, clientConfig.isTcpSoReuseAddress()). - option(ChannelOption.SO_KEEPALIVE, clientConfig.isTcpSoKeepAlive()). - option(ChannelOption.TCP_NODELAY, clientConfig.isTcpSoNoDelay()). - option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getTcpSoTimeout()). - option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(clientConfig.getWriteBufLowWaterMark(), - clientConfig.getWriteBufHighWaterMark())); - } - } - @Override public void stop() { try { @@ -157,6 +135,28 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti super.stop(); } + private void applyOptions(Bootstrap bootstrap) { + if (null != clientConfig) { + if (clientConfig.getTcpSoLinger() > 0) { + bootstrap.option(ChannelOption.SO_LINGER, clientConfig.getTcpSoLinger()); + } + + if (clientConfig.getTcpSoSndBufSize() > 0) { + bootstrap.option(ChannelOption.SO_SNDBUF, clientConfig.getTcpSoSndBufSize()); + } + if (clientConfig.getTcpSoRcvBufSize() > 0) { + bootstrap.option(ChannelOption.SO_RCVBUF, clientConfig.getTcpSoRcvBufSize()); + } + + bootstrap.option(ChannelOption.SO_REUSEADDR, clientConfig.isTcpSoReuseAddress()). + option(ChannelOption.SO_KEEPALIVE, clientConfig.isTcpSoKeepAlive()). + option(ChannelOption.TCP_NODELAY, clientConfig.isTcpSoNoDelay()). + option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getTcpSoTimeout()). + option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(clientConfig.getWriteBufLowWaterMark(), + clientConfig.getWriteBufHighWaterMark())); + } + } + private void closeChannel(final String addr, final Channel channel) { if (null == channel) return; @@ -277,6 +277,28 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } + @Override + public void invokeAsync(final String address, final RemotingCommand request, final AsyncHandler asyncHandler, + final long timeoutMillis) { + + final Channel channel = this.createIfAbsent(address); + if (channel != null && channel.isActive()) { + this.invokeAsyncWithInterceptor(channel, request, asyncHandler, timeoutMillis); + } else { + this.closeChannel(address, channel); + } + } + + @Override + public void invokeOneWay(final String address, final RemotingCommand request) { + final Channel channel = this.createIfAbsent(address); + if (channel != null && channel.isActive()) { + this.invokeOnewayWithInterceptor(channel, request); + } else { + this.closeChannel(address, channel); + } + } + private Channel createIfAbsent(final String addr) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isActive()) { @@ -345,28 +367,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti return null; } - @Override - public void invokeAsync(final String address, final RemotingCommand request, final AsyncHandler asyncHandler, - final long timeoutMillis) { - - final Channel channel = this.createIfAbsent(address); - if (channel != null && channel.isActive()) { - this.invokeAsyncWithInterceptor(channel, request, asyncHandler, timeoutMillis); - } else { - this.closeChannel(address, channel); - } - } - - @Override - public void invokeOneWay(final String address, final RemotingCommand request) { - final Channel channel = this.createIfAbsent(address); - if (channel != null && channel.isActive()) { - this.invokeOnewayWithInterceptor(channel, request); - } else { - this.closeChannel(address, channel); - } - } - private class ChannelWrapper { private final ChannelFuture channelFuture; diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java index 40e3cb7..6b763cf 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java @@ -127,6 +127,25 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti startUpHouseKeepingService(); } + @Override + public void stop() { + try { + ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS); + + ThreadUtils.shutdownGracefully(channelEventExecutor); + + this.bossGroup.shutdownGracefully().syncUninterruptibly(); + + this.ioGroup.shutdownGracefully().syncUninterruptibly(); + + this.workerGroup.shutdownGracefully().syncUninterruptibly(); + } catch (Exception e) { + LOG.warn("RemotingServer stopped error !", e); + } + + super.stop(); + } + private void applyOptions(ServerBootstrap bootstrap) { //option() is for the NioServerSocketChannel that accepts incoming connections. //childOption() is for the Channels accepted by the parent ServerChannel, which is NioServerSocketChannel in this case @@ -158,25 +177,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } @Override - public void stop() { - try { - ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS); - - ThreadUtils.shutdownGracefully(channelEventExecutor); - - this.bossGroup.shutdownGracefully().syncUninterruptibly(); - - this.ioGroup.shutdownGracefully().syncUninterruptibly(); - - this.workerGroup.shutdownGracefully().syncUninterruptibly(); - } catch (Exception e) { - LOG.warn("RemotingServer stopped error !", e); - } - - super.stop(); - } - - @Override public int localListenPort() { return this.port; } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java index b64ab10..55d8cda 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/UIDGenerator.java @@ -55,6 +55,10 @@ public class UIDGenerator { counter = 0; } + public static UIDGenerator instance() { + return generatorLocal.get(); + } + public byte[] createFakeIP() { ByteBuffer bb = ByteBuffer.allocate(8); bb.putLong(System.currentTimeMillis()); @@ -77,10 +81,6 @@ public class UIDGenerator { nextStartTime = cal.getTimeInMillis(); } - public static UIDGenerator instance() { - return generatorLocal.get(); - } - public String createUID() { long current = System.currentTimeMillis(); if (current >= nextStartTime) {
