Repository: tajo
Updated Branches:
  refs/heads/index_support 1c53ccf7a -> 3c182347d


TAJO-1424: Investigate the problem of too many "Try to connect" messeges during 
Travic CI build.

Closes #488

Signed-off-by: Jihoon Son <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7c2a2409
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7c2a2409
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7c2a2409

Branch: refs/heads/index_support
Commit: 7c2a2409032c1bcd1fa9c636f0ef49982a995362
Parents: fdb098b
Author: navis.ryu <[email protected]>
Authored: Thu Apr 2 11:24:52 2015 +0900
Committer: Jihoon Son <[email protected]>
Committed: Thu Apr 2 11:25:11 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +++
 .../org/apache/tajo/rpc/NettyClientBase.java    | 26 +++++++++++---------
 .../org/apache/tajo/rpc/RpcConnectionPool.java  | 19 +++++++-------
 3 files changed, 27 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/7c2a2409/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 12bf84e..c3bce7d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -107,6 +107,9 @@ Release 0.11.0 - unreleased
   
   TASKS
 
+    TAJO-1424: Investigate the problem of too many "Try to connect" messeges 
+    during Travic CI build. (Contributed by navis, Committed by jihoon)
+
     TAJO-1482: Cleanup the legacy cluster mode. (jinho)
 
     TAJO-1439: Some method name is written wrongly. 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c2a2409/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
index 72278f2..cdc4cc6 100644
--- 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java
@@ -46,6 +46,7 @@ public abstract class NettyClientBase implements Closeable {
 
   private Bootstrap bootstrap;
   private volatile ChannelFuture channelFuture;
+  private volatile long lastConnected = -1;
 
   protected final Class<?> protocol;
   protected final AtomicInteger sequence = new AtomicInteger(0);
@@ -106,20 +107,20 @@ public abstract class NettyClientBase implements 
Closeable {
   }
 
   private boolean checkConnection(long timeout) {
-    if (isConnected()) {
-      return true;
-    }
+    return isConnected() || handleConnectionInternally(key.addr, timeout);
+  }
 
-    InetSocketAddress addr = key.addr;
-    if (addr.isUnresolved()) {
-      addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort());
+  private InetSocketAddress resolveAddress(InetSocketAddress address) {
+    if (address.isUnresolved()) {
+      return RpcUtils.createSocketAddr(address.getHostName(), 
address.getPort());
     }
-
-    return handleConnectionInternally(addr, timeout);
+    return address;
   }
 
   private void connectUsingNetty(InetSocketAddress address, 
GenericFutureListener<ChannelFuture> listener) {
-    LOG.warn("Try to connect : " + address);
+    if (lastConnected > 0) {
+      LOG.warn("Try to reconnect : " + address);
+    }
     this.channelFuture = 
bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup())
             .connect(address)
             .addListener(listener);
@@ -139,7 +140,8 @@ public abstract class NettyClientBase implements Closeable {
     }
 
     if (ticket == granted) {
-      connectUsingNetty(addr, new RetryConnectionListener(addr, granted));
+      InetSocketAddress address = resolveAddress(addr);
+      connectUsingNetty(address, new RetryConnectionListener(address, 
granted));
     }
 
     try {
@@ -173,12 +175,11 @@ public abstract class NettyClientBase implements 
Closeable {
         channelFuture.channel().close();
 
         if (numRetries > retryCount.getAndIncrement()) {
-          final GenericFutureListener<ChannelFuture> currentListener = this;
 
           RpcChannelFactory.getSharedClientEventloopGroup().schedule(new 
Runnable() {
             @Override
             public void run() {
-              connectUsingNetty(address, currentListener);
+              connectUsingNetty(address, RetryConnectionListener.this);
             }
           }, PAUSE, TimeUnit.MILLISECONDS);
 
@@ -192,6 +193,7 @@ public abstract class NettyClientBase implements Closeable {
       }
       else {
         latch.countDown();
+        lastConnected = System.currentTimeMillis();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7c2a2409/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
index 6d1f479..b0ff910 100644
--- 
a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
+++ 
b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java
@@ -94,26 +94,27 @@ public class RpcConnectionPool {
   }
 
   public void releaseConnection(NettyClientBase client) {
-    release(client, false);
+    if (client != null) {
+      release(client, false);
+    }
   }
 
   public void closeConnection(NettyClientBase client) {
-    release(client, true);
+    if (client != null) {
+      release(client, true);
+    }
   }
 
   private void release(NettyClientBase client, boolean close) {
-    if (client == null) {
-      return;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Close connection [" + client.getKey() + "]");
-    }
     try {
       if (returnToPool(client, close)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Closing connection [" + client.getKey() + "]");
+        }
         client.close();
       }
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Current Connections [" + connections.size() + "]");
+        LOG.debug("Current Connections in pool [" + connections.size() + "]");
       }
     } catch (Exception e) {
       LOG.error("Can't close connection:" + client.getKey() + ":" + 
e.getMessage(), e);

Reply via email to