Repository: tajo Updated Branches: refs/heads/index_support 1c53ccf7a -> 3c182347d
TAJO-1424: Investigate the problem of too many "Try to connect" messeges during Travic CI build. Closes #488 Signed-off-by: Jihoon Son <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7c2a2409 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7c2a2409 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7c2a2409 Branch: refs/heads/index_support Commit: 7c2a2409032c1bcd1fa9c636f0ef49982a995362 Parents: fdb098b Author: navis.ryu <[email protected]> Authored: Thu Apr 2 11:24:52 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Thu Apr 2 11:25:11 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 +++ .../org/apache/tajo/rpc/NettyClientBase.java | 26 +++++++++++--------- .../org/apache/tajo/rpc/RpcConnectionPool.java | 19 +++++++------- 3 files changed, 27 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/7c2a2409/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 12bf84e..c3bce7d 100644 --- a/CHANGES +++ b/CHANGES @@ -107,6 +107,9 @@ Release 0.11.0 - unreleased TASKS + TAJO-1424: Investigate the problem of too many "Try to connect" messeges + during Travic CI build. (Contributed by navis, Committed by jihoon) + TAJO-1482: Cleanup the legacy cluster mode. (jinho) TAJO-1439: Some method name is written wrongly. http://git-wip-us.apache.org/repos/asf/tajo/blob/7c2a2409/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index 72278f2..cdc4cc6 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -46,6 +46,7 @@ public abstract class NettyClientBase implements Closeable { private Bootstrap bootstrap; private volatile ChannelFuture channelFuture; + private volatile long lastConnected = -1; protected final Class<?> protocol; protected final AtomicInteger sequence = new AtomicInteger(0); @@ -106,20 +107,20 @@ public abstract class NettyClientBase implements Closeable { } private boolean checkConnection(long timeout) { - if (isConnected()) { - return true; - } + return isConnected() || handleConnectionInternally(key.addr, timeout); + } - InetSocketAddress addr = key.addr; - if (addr.isUnresolved()) { - addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort()); + private InetSocketAddress resolveAddress(InetSocketAddress address) { + if (address.isUnresolved()) { + return RpcUtils.createSocketAddr(address.getHostName(), address.getPort()); } - - return handleConnectionInternally(addr, timeout); + return address; } private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) { - LOG.warn("Try to connect : " + address); + if (lastConnected > 0) { + LOG.warn("Try to reconnect : " + address); + } this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup()) .connect(address) .addListener(listener); @@ -139,7 +140,8 @@ public abstract class NettyClientBase implements Closeable { } if (ticket == granted) { - connectUsingNetty(addr, new RetryConnectionListener(addr, granted)); + InetSocketAddress address = resolveAddress(addr); + connectUsingNetty(address, new RetryConnectionListener(address, granted)); } try { @@ -173,12 +175,11 @@ public abstract class NettyClientBase implements Closeable { channelFuture.channel().close(); if (numRetries > retryCount.getAndIncrement()) { - final GenericFutureListener<ChannelFuture> currentListener = this; RpcChannelFactory.getSharedClientEventloopGroup().schedule(new Runnable() { @Override public void run() { - connectUsingNetty(address, currentListener); + connectUsingNetty(address, RetryConnectionListener.this); } }, PAUSE, TimeUnit.MILLISECONDS); @@ -192,6 +193,7 @@ public abstract class NettyClientBase implements Closeable { } else { latch.countDown(); + lastConnected = System.currentTimeMillis(); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/7c2a2409/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java index 6d1f479..b0ff910 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java @@ -94,26 +94,27 @@ public class RpcConnectionPool { } public void releaseConnection(NettyClientBase client) { - release(client, false); + if (client != null) { + release(client, false); + } } public void closeConnection(NettyClientBase client) { - release(client, true); + if (client != null) { + release(client, true); + } } private void release(NettyClientBase client, boolean close) { - if (client == null) { - return; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Close connection [" + client.getKey() + "]"); - } try { if (returnToPool(client, close)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Closing connection [" + client.getKey() + "]"); + } client.close(); } if (LOG.isDebugEnabled()) { - LOG.debug("Current Connections [" + connections.size() + "]"); + LOG.debug("Current Connections in pool [" + connections.size() + "]"); } } catch (Exception e) { LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e);
