This is an automated email from the ASF dual-hosted git repository. yukon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/rocketmq-remoting.git
commit c2e49dbc201113113de02d9412ef948b9b3b3207 Author: yukon <[email protected]> AuthorDate: Tue May 28 10:45:37 2019 +0800 Pull client channel management logic to ClientChannelManager --- .../remoting/impl/netty/ClientChannelManager.java | 241 +++++++++++++++++++++ .../remoting/impl/netty/NettyRemotingAbstract.java | 32 ++- .../remoting/impl/netty/NettyRemotingClient.java | 233 ++------------------ .../remoting/impl/netty/NettyRemotingServer.java | 6 - .../rocketmq/remoting/internal/RemotingUtil.java | 27 +++ 5 files changed, 299 insertions(+), 240 deletions(-) diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java new file mode 100644 index 0000000..2f59d24 --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.impl.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.rocketmq.remoting.config.RemotingConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.rocketmq.remoting.internal.RemotingUtil.extractRemoteAddress; + +public class ClientChannelManager { + protected static final Logger LOG = LoggerFactory.getLogger(ClientChannelManager.class); + + private static final long LOCK_TIMEOUT_MILLIS = 3000; + private final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<>(); + private final Lock lockChannelTables = new ReentrantLock(); + private final Bootstrap clientBootstrap; + private final RemotingConfig clientConfig; + + ClientChannelManager(final Bootstrap bootstrap, + final RemotingConfig config) { + clientBootstrap = bootstrap; + clientConfig = config; + } + + void clear() { + for (ChannelWrapper cw : this.channelTables.values()) { + this.closeChannel(null, cw.getChannel()); + } + + this.channelTables.clear(); + } + + Channel createIfAbsent(final String addr) { + ChannelWrapper cw = this.channelTables.get(addr); + if (cw != null && cw.isActive()) { + return cw.getChannel(); + } + return this.createChannel(addr); + } + + private Channel createChannel(final String addr) { + ChannelWrapper cw = null; + try { + if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + boolean createNewConnection; + cw = this.channelTables.get(addr); + if (cw != null) { + if (cw.isActive()) { + return cw.getChannel(); + } else if (!cw.getChannelFuture().isDone()) { + createNewConnection = false; + } else { + this.channelTables.remove(addr); + createNewConnection = true; + } + } else { + createNewConnection = true; + } + + if (createNewConnection) { + String[] s = addr.split(":"); + SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1])); + ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress); + LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr); + cw = new ChannelWrapper(channelFuture); + this.channelTables.put(addr, cw); + } + } catch (Exception e) { + LOG.error("createChannel: create channel exception", e); + } finally { + this.lockChannelTables.unlock(); + } + } else { + LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + + if (cw != null) { + ChannelFuture channelFuture = cw.getChannelFuture(); + if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) { + if (cw.isActive()) { + LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); + return cw.getChannel(); + } else { + LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause()); + this.closeChannel(addr, cw.getChannel()); + } + } else { + LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(), + channelFuture.toString()); + this.closeChannel(addr, cw.getChannel()); + } + } + return null; + } + + void closeChannel(final String addr, final Channel channel) { + if (null == channel) + return; + + final String addrRemote = null == addr ? extractRemoteAddress(channel) : addr; + try { + if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + boolean removeItemFromTable = true; + ChannelWrapper prevCW = this.channelTables.get(addrRemote); + //Workaround for null + if (null == prevCW) { + return; + } + + LOG.info("Begin to close the remote address {} channel {}", addrRemote, prevCW); + + if (prevCW.getChannel() != channel) { + LOG.info("Channel {} has been closed,this is a new channel.", prevCW.getChannel(), channel); + removeItemFromTable = false; + } + + if (removeItemFromTable) { + this.channelTables.remove(addrRemote); + LOG.info("Channel {} has been removed !", addrRemote); + } + + channel.close().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + LOG.warn("Close channel {} {}", channel, future.isSuccess()); + } + }); + } catch (Exception e) { + LOG.error("Close channel error !", e); + } finally { + this.lockChannelTables.unlock(); + } + } else { + LOG.warn("Can not lock channel table in {} ms", LOCK_TIMEOUT_MILLIS); + } + } catch (InterruptedException e) { + LOG.error("Close channel error !", e); + } + } + + void closeChannel(final Channel channel) { + if (null == channel) + return; + + try { + if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { + try { + boolean removeItemFromTable = true; + ChannelWrapper prevCW = null; + String addrRemote = null; + + for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) { + ChannelWrapper prev = entry.getValue(); + if (prev.getChannel() != null) { + if (prev.getChannel() == channel) { + prevCW = prev; + addrRemote = entry.getKey(); + break; + } + } + } + + if (null == prevCW) { + LOG.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote); + removeItemFromTable = false; + } + + if (removeItemFromTable) { + this.channelTables.remove(addrRemote); + LOG.info("closeChannel: the channel[{}] was removed from channel table", addrRemote); + //RemotingHelper.closeChannel(channel); + } + } catch (Exception e) { + LOG.error("closeChannel: close the channel exception", e); + } finally { + this.lockChannelTables.unlock(); + } + } else { + LOG.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); + } + } catch (InterruptedException e) { + LOG.error("closeChannel exception", e); + } + } + + private class ChannelWrapper { + private final ChannelFuture channelFuture; + + ChannelWrapper(ChannelFuture channelFuture) { + this.channelFuture = channelFuture; + } + + boolean isActive() { + return this.channelFuture.channel() != null && this.channelFuture.channel().isActive(); + } + + boolean isWriteable() { + return this.channelFuture.channel().isWritable(); + } + + private Channel getChannel() { + return this.channelFuture.channel(); + } + + ChannelFuture getChannelFuture() { + return channelFuture; + } + } +} diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index 38059a8..920a922 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -22,7 +22,6 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; import java.util.List; @@ -58,6 +57,7 @@ import org.apache.rocketmq.remoting.external.ThreadUtils; import org.apache.rocketmq.remoting.impl.channel.NettyChannelImpl; import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl; import org.apache.rocketmq.remoting.impl.command.RemotingSysResponseCode; +import org.apache.rocketmq.remoting.internal.RemotingUtil; import org.apache.rocketmq.remoting.internal.UIDGenerator; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; @@ -88,7 +88,9 @@ public abstract class NettyRemotingAbstract implements RemotingService { } protected void putNettyEvent(final NettyChannelEvent event) { - this.channelEventExecutor.putNettyEvent(event); + if (channelEventListenerGroup != null && channelEventListenerGroup.size() != 0) { + this.channelEventExecutor.putNettyEvent(event); + } } protected void startUpHouseKeepingService() { @@ -172,7 +174,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { processorExecutorPair.getRight().submit(run); } catch (RejectedExecutionException e) { LOG.warn(String.format("Request %s from %s Rejected by server executor %s !", cmd, - extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString())); + RemotingUtil.extractRemoteAddress(ctx.channel()), processorExecutorPair.getRight().toString())); if (cmd.trafficType() != TrafficType.REQUEST_ONEWAY) { RemotingCommand response = remotingCommandFactory.createResponse(cmd); @@ -190,7 +192,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { responseFuture.release(); this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST, - extractRemoteAddress(ctx.channel()), responseFuture.getRequestCommand(), response)); + RemotingUtil.extractRemoteAddress(ctx.channel()), responseFuture.getRequestCommand(), response)); if (responseFuture.getAsyncHandler() != null) { executeAsyncHandler(responseFuture); @@ -199,7 +201,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { responseFuture.release(); } } else { - LOG.warn("request {} from {} has not matched response !", response, extractRemoteAddress(ctx.channel())); + LOG.warn("request {} from {} has not matched response !", response, RemotingUtil.extractRemoteAddress(ctx.channel())); } } @@ -211,12 +213,12 @@ public abstract class NettyRemotingAbstract implements RemotingService { public void run() { try { interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.RESPONSE, - extractRemoteAddress(ctx.channel()), cmd)); + RemotingUtil.extractRemoteAddress(ctx.channel()), cmd)); RemotingCommand response = processorExecutorPair.getLeft().processRequest(channel, cmd); interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.RESPONSE, - extractRemoteAddress(ctx.channel()), cmd, response)); + RemotingUtil.extractRemoteAddress(ctx.channel()), cmd, response)); handleResponse(response, cmd, ctx); } catch (Throwable e) { @@ -228,10 +230,6 @@ public abstract class NettyRemotingAbstract implements RemotingService { }; } - protected String extractRemoteAddress(Channel channel) { - return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress(); - } - private void writeAndFlush(final Channel channel, final Object msg) { channel.writeAndFlush(msg); } @@ -321,14 +319,14 @@ public abstract class NettyRemotingAbstract implements RemotingService { long timeoutMillis) { request.trafficType(TrafficType.REQUEST_SYNC); - final String remoteAddr = extractRemoteAddress(channel); + final String remoteAddr = RemotingUtil.extractRemoteAddress(channel); this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request)); RemotingCommand responseCommand = this.invoke0(remoteAddr, channel, request, timeoutMillis); this.interceptorGroup.afterResponseReceived(new ResponseContext(RemotingEndPoint.REQUEST, - extractRemoteAddress(channel), request, responseCommand)); + RemotingUtil.extractRemoteAddress(channel), request, responseCommand)); return responseCommand; } @@ -367,9 +365,9 @@ public abstract class NettyRemotingAbstract implements RemotingService { if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { - throw new RemoteTimeoutException(extractRemoteAddress(channel), timeoutMillis, responseFuture.getCause()); + throw new RemoteTimeoutException(RemotingUtil.extractRemoteAddress(channel), timeoutMillis, responseFuture.getCause()); } else { - throw new RemoteAccessException(extractRemoteAddress(channel), responseFuture.getCause()); + throw new RemoteAccessException(RemotingUtil.extractRemoteAddress(channel), responseFuture.getCause()); } } @@ -387,7 +385,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { final AsyncHandler invokeCallback, long timeoutMillis) { request.trafficType(TrafficType.REQUEST_ASYNC); - final String remoteAddr = extractRemoteAddress(channel); + final String remoteAddr = RemotingUtil.extractRemoteAddress(channel); this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, remoteAddr, request)); @@ -436,7 +434,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { public void invokeOnewayWithInterceptor(final Channel channel, final RemotingCommand request) { request.trafficType(TrafficType.REQUEST_ONEWAY); - this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, extractRemoteAddress(channel), request)); + this.interceptorGroup.beforeRequest(new RequestContext(RemotingEndPoint.REQUEST, RemotingUtil.extractRemoteAddress(channel), request)); this.invokeOneway0(channel, request); } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java index c263f22..ce30aa2 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingClient.java @@ -20,8 +20,6 @@ package org.apache.rocketmq.remoting.impl.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -38,13 +36,8 @@ import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; -import java.net.InetSocketAddress; import java.net.SocketAddress; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.RemotingClient; import org.apache.rocketmq.remoting.api.command.RemotingCommand; @@ -59,16 +52,14 @@ import org.apache.rocketmq.remoting.impl.netty.handler.ExceptionHandler; import org.apache.rocketmq.remoting.internal.JvmUtils; public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient { - private static final long LOCK_TIMEOUT_MILLIS = 3000; private final Bootstrap clientBootstrap = new Bootstrap(); private final EventLoopGroup ioGroup; private final Class<? extends SocketChannel> socketChannelClass; private final RemotingConfig clientConfig; - private final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>(); - private final Lock lockChannelTables = new ReentrantLock(); private EventExecutorGroup workerGroup; + private ClientChannelManager clientChannelManager; public NettyRemotingClient(final RemotingConfig clientConfig) { super(clientConfig); @@ -84,6 +75,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti socketChannelClass = NioSocketChannel.class; } + this.clientChannelManager = new ClientChannelManager(clientBootstrap, clientConfig); + this.workerGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads())); } @@ -117,11 +110,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti try { ThreadUtils.shutdownGracefully(houseKeepingService, 3000, TimeUnit.MILLISECONDS); - for (ChannelWrapper cw : this.channelTables.values()) { - this.closeChannel(null, cw.getChannel()); - } - - this.channelTables.clear(); + clientChannelManager.clear(); this.ioGroup.shutdownGracefully(); @@ -157,102 +146,11 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } } - private void closeChannel(final String addr, final Channel channel) { - if (null == channel) - return; - - final String addrRemote = null == addr ? extractRemoteAddress(channel) : addr; - try { - if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - boolean removeItemFromTable = true; - ChannelWrapper prevCW = this.channelTables.get(addrRemote); - //Workaround for null - if (null == prevCW) { - return; - } - - LOG.info("Begin to close the remote address {} channel {}", addrRemote, prevCW); - - if (prevCW.getChannel() != channel) { - LOG.info("Channel {} has been closed,this is a new channel.", prevCW.getChannel(), channel); - removeItemFromTable = false; - } - - if (removeItemFromTable) { - this.channelTables.remove(addrRemote); - LOG.info("Channel {} has been removed !", addrRemote); - } - - channel.close().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - LOG.warn("Close channel {} {}", channel, future.isSuccess()); - } - }); - } catch (Exception e) { - LOG.error("Close channel error !", e); - } finally { - this.lockChannelTables.unlock(); - } - } else { - LOG.warn("Can not lock channel table in {} ms", LOCK_TIMEOUT_MILLIS); - } - } catch (InterruptedException e) { - LOG.error("Close channel error !", e); - } - } - - private void closeChannel(final Channel channel) { - if (null == channel) - return; - - try { - if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - boolean removeItemFromTable = true; - ChannelWrapper prevCW = null; - String addrRemote = null; - - for (Map.Entry<String, ChannelWrapper> entry : channelTables.entrySet()) { - ChannelWrapper prev = entry.getValue(); - if (prev.getChannel() != null) { - if (prev.getChannel() == channel) { - prevCW = prev; - addrRemote = entry.getKey(); - break; - } - } - } - - if (null == prevCW) { - LOG.info("eventCloseChannel: the channel[{}] has been removed from the channel table before", addrRemote); - removeItemFromTable = false; - } - - if (removeItemFromTable) { - this.channelTables.remove(addrRemote); - LOG.info("closeChannel: the channel[{}] was removed from channel table", addrRemote); - //RemotingHelper.closeChannel(channel); - } - } catch (Exception e) { - LOG.error("closeChannel: close the channel exception", e); - } finally { - this.lockChannelTables.unlock(); - } - } else { - LOG.warn("closeChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); - } - } catch (InterruptedException e) { - LOG.error("closeChannel exception", e); - } - } - @Override public RemotingCommand invoke(final String address, final RemotingCommand request, final long timeoutMillis) { request.trafficType(TrafficType.REQUEST_SYNC); - Channel channel = this.createIfAbsent(address); + Channel channel = this.clientChannelManager.createIfAbsent(address); if (channel != null && channel.isActive()) { try { return this.invokeWithInterceptor(channel, request, timeoutMillis); @@ -260,18 +158,18 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } catch (RemoteTimeoutException e) { if (this.clientConfig.isClientCloseSocketIfTimeout()) { LOG.warn("invoke: timeout, so close the socket {} ms, {}", timeoutMillis, address); - this.closeChannel(address, channel); + this.clientChannelManager.closeChannel(address, channel); } LOG.warn("invoke: wait response timeout<{}ms> exception, so close the channel[{}]", timeoutMillis, address); throw e; } finally { if (this.clientConfig.isClientShortConnectionEnable()) { - this.closeChannel(address, channel); + this.clientChannelManager.closeChannel(address, channel); } } } else { - this.closeChannel(address, channel); + this.clientChannelManager.closeChannel(address, channel); throw new RemoteConnectFailureException(address); } @@ -281,113 +179,21 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti public void invokeAsync(final String address, final RemotingCommand request, final AsyncHandler asyncHandler, final long timeoutMillis) { - final Channel channel = this.createIfAbsent(address); + final Channel channel = this.clientChannelManager.createIfAbsent(address); if (channel != null && channel.isActive()) { this.invokeAsyncWithInterceptor(channel, request, asyncHandler, timeoutMillis); } else { - this.closeChannel(address, channel); + this.clientChannelManager.closeChannel(address, channel); } } @Override public void invokeOneWay(final String address, final RemotingCommand request) { - final Channel channel = this.createIfAbsent(address); + final Channel channel = this.clientChannelManager.createIfAbsent(address); if (channel != null && channel.isActive()) { this.invokeOnewayWithInterceptor(channel, request); } else { - this.closeChannel(address, channel); - } - } - - private Channel createIfAbsent(final String addr) { - ChannelWrapper cw = this.channelTables.get(addr); - if (cw != null && cw.isActive()) { - return cw.getChannel(); - } - return this.createChannel(addr); - } - - //FIXME need test to verify - private Channel createChannel(final String addr) { - ChannelWrapper cw = null; - try { - if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { - try { - boolean createNewConnection; - cw = this.channelTables.get(addr); - if (cw != null) { - if (cw.isActive()) { - return cw.getChannel(); - } else if (!cw.getChannelFuture().isDone()) { - createNewConnection = false; - } else { - this.channelTables.remove(addr); - createNewConnection = true; - } - } else { - createNewConnection = true; - } - - if (createNewConnection) { - String[] s = addr.split(":"); - SocketAddress socketAddress = new InetSocketAddress(s[0], Integer.valueOf(s[1])); - ChannelFuture channelFuture = this.clientBootstrap.connect(socketAddress); - LOG.info("createChannel: begin to connect remote host[{}] asynchronously", addr); - cw = new ChannelWrapper(channelFuture); - this.channelTables.put(addr, cw); - } - } catch (Exception e) { - LOG.error("createChannel: create channel exception", e); - } finally { - this.lockChannelTables.unlock(); - } - } else { - LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); - } - } catch (InterruptedException e) { - e.printStackTrace(); - } - - if (cw != null) { - ChannelFuture channelFuture = cw.getChannelFuture(); - if (channelFuture.awaitUninterruptibly(this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis())) { - if (cw.isActive()) { - LOG.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); - return cw.getChannel(); - } else { - LOG.warn("createChannel: connect remote host[" + addr + "] failed, and destroy the channel" + channelFuture.toString(), channelFuture.cause()); - this.closeChannel(addr, cw.getChannel()); - } - } else { - LOG.warn("createChannel: connect remote host[{}] timeout {}ms, {}, and destroy the channel", addr, this.clientConfig.getClientConnectionFutureAwaitTimeoutMillis(), - channelFuture.toString()); - this.closeChannel(addr, cw.getChannel()); - } - } - return null; - } - - private class ChannelWrapper { - private final ChannelFuture channelFuture; - - ChannelWrapper(ChannelFuture channelFuture) { - this.channelFuture = channelFuture; - } - - boolean isActive() { - return this.channelFuture.channel() != null && this.channelFuture.channel().isActive(); - } - - boolean isWriteable() { - return this.channelFuture.channel().isWritable(); - } - - private Channel getChannel() { - return this.channelFuture.channel(); - } - - ChannelFuture getChannelFuture() { - return channelFuture; + this.clientChannelManager.closeChannel(address, channel); } } @@ -407,7 +213,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { LOG.info("Remote address {} disconnect channel {}.", ctx.channel().remoteAddress(), ctx.channel()); - closeChannel(ctx.channel()); + NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel()); super.disconnect(ctx, promise); @@ -418,7 +224,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { LOG.info("Remote address {} close channel {}.", ctx.channel().remoteAddress(), ctx.channel()); - closeChannel(ctx.channel()); + NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel()); super.close(ctx, promise); @@ -431,7 +237,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.ALL_IDLE)) { LOG.info("Close channel {} because of idle event {} ", ctx.channel(), event); - closeChannel(ctx.channel()); + NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel()); putNettyEvent(new NettyChannelEvent(NettyChannelEventType.IDLE, ctx.channel())); } } @@ -440,16 +246,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti } @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(), - ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable()); - } - - @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { LOG.info("Close channel {} because of error {} ", ctx.channel(), cause); - - closeChannel(ctx.channel()); + NettyRemotingClient.this.clientChannelManager.closeChannel(ctx.channel()); putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel())); } } diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java index 55ce2d2..f1e9360 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingServer.java @@ -233,12 +233,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } @Override - public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { - LOG.warn("Channel {} channelWritabilityChanged event triggered - bytesBeforeUnwritable:{},bytesBeforeWritable:{}", ctx.channel(), - ctx.channel().bytesBeforeUnwritable(), ctx.channel().bytesBeforeWritable()); - } - - @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) throws Exception { putNettyEvent(new NettyChannelEvent(NettyChannelEventType.EXCEPTION, ctx.channel(), cause)); diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java new file mode 100644 index 0000000..89e4bff --- /dev/null +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/internal/RemotingUtil.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.remoting.internal; + +import io.netty.channel.Channel; +import java.net.InetSocketAddress; + +public class RemotingUtil { + public static String extractRemoteAddress(Channel channel) { + return ((InetSocketAddress) channel.remoteAddress()).getAddress().getHostAddress(); + } +}
