lollipopjin commented on code in PR #7816:
URL: https://github.com/apache/rocketmq/pull/7816#discussion_r1477954085
##########
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java:
##########
@@ -715,84 +701,104 @@ private Channel createChannel(final String addr) throws
InterruptedException {
LOGGER.warn("createChannel: try to lock channel table, but
timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}
- if (cw != null) {
- return waitChannelFuture(addr, cw);
- }
-
return null;
}
- private Channel waitChannelFuture(String addr, ChannelWrapper cw) {
- ChannelFuture channelFuture = cw.getChannelFuture();
- if
(channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis()))
{
- if (cw.isOK()) {
- LOGGER.info("createChannel: connect remote host[{}] success,
{}", addr, channelFuture.toString());
- return cw.getChannel();
- } else {
- LOGGER.warn("createChannel: connect remote host[{}] failed,
{}", addr, channelFuture.toString());
- }
- } else {
- LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms,
{}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
- channelFuture.toString());
- }
- return null;
+ private ChannelWrapper createChannel(String addr) {
+ String[] hostAndPort = getHostAndPort(addr);
+ ChannelFuture channelFuture = fetchBootstrap(addr)
+ .connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
+ LOGGER.info("createChannel: begin to connect remote host[{}]
asynchronously", addr);
+ ChannelWrapper cw = new ChannelWrapper(addr, channelFuture);
+ this.channelTables.put(addr, cw);
+ this.channelWrapperTables.put(channelFuture.channel(), cw);
+ return cw;
}
@Override
public void invokeAsync(String addr, RemotingCommand request, long
timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException,
RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
- final Channel channel = this.getAndCreateChannel(addr);
- String channelRemoteAddr =
RemotingHelper.parseChannelRemoteAddr(channel);
- if (channel != null && channel.isActive()) {
- long costTime = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTime) {
- throw new RemotingTooMuchRequestException("invokeAsync call
the addr[" + channelRemoteAddr + "] timeout");
- }
- this.invokeAsyncImpl(channel, request, timeoutMillis - costTime,
new InvokeCallbackWrapper(invokeCallback, addr));
- } else {
- this.closeChannel(addr, channel);
- throw new RemotingConnectException(addr);
+ final ChannelFuture channelFuture =
this.getAndCreateChannelAsync(addr);
+ if (channelFuture == null) {
+ invokeCallback.operationFail(new RemotingConnectException(addr));
+ return;
}
+ channelFuture.addListener(future -> {
+ if (future.isSuccess()) {
+ Channel channel = channelFuture.channel();
+ String channelRemoteAddr =
RemotingHelper.parseChannelRemoteAddr(channel);
+ if (channel != null && channel.isActive()) {
+ long costTime = System.currentTimeMillis() -
beginStartTime;
+ if (timeoutMillis < costTime) {
+ invokeCallback.operationFail(new
RemotingTooMuchRequestException("invokeAsync call the addr[" +
channelRemoteAddr + "] timeout"));
Review Comment:
Please add real cost time to the Exception mesage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]