TAJO-1391: RpcConnectionPool should check reference counter of connection before close
Closes #412 Signed-off-by: Jihun Kang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/0dc7d680 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/0dc7d680 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/0dc7d680 Branch: refs/heads/index_support Commit: 0dc7d68071dcf7c9d01dde8ed7598ca422e4c50c Parents: e1e38e2 Author: navis.ryu <[email protected]> Authored: Mon Mar 16 10:03:10 2015 +0900 Committer: Jihun Kang <[email protected]> Committed: Mon Mar 16 10:03:10 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../apache/tajo/client/SessionConnection.java | 2 +- .../org/apache/tajo/rpc/AsyncRpcClient.java | 63 ++------ .../org/apache/tajo/rpc/BlockingRpcClient.java | 68 ++------- .../org/apache/tajo/rpc/NettyClientBase.java | 148 ++++++++++++------- .../org/apache/tajo/rpc/RpcConnectionPool.java | 112 +++++++------- .../main/java/org/apache/tajo/rpc/RpcUtils.java | 54 +++++++ .../java/org/apache/tajo/rpc/TestAsyncRpc.java | 32 ++-- .../org/apache/tajo/rpc/TestBlockingRpc.java | 34 +++-- 9 files changed, 272 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 84a7571..9d2cd14 100644 --- a/CHANGES +++ b/CHANGES @@ -9,6 +9,9 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1391: RpcConnectionPool should check reference counter of connection + before close. (Contributed by navis, Committed by jihun) + TAJO-1383: Improve broadcast table cache. (jinho) TAJO-1374: Support multi-bytes delimiter for CSV file. http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index d05d3b1..d24e7b3 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -128,7 +128,7 @@ public class SessionConnection implements Closeable { if(!closed.get()){ try { return connPool.getConnection(serviceTracker.getClientServiceAddress(), - TajoMasterClientProtocol.class, false).isActive(); + TajoMasterClientProtocol.class, false).isConnected(); } catch (Throwable e) { return false; } http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java index 5845229..1ea9fb1 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java @@ -24,6 +24,7 @@ import com.google.protobuf.*; import io.netty.channel.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; @@ -34,62 +35,33 @@ import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; public class AsyncRpcClient extends NettyClientBase { private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class); - private final ChannelInitializer<Channel> initializer; - private final ProxyRpcChannel rpcChannel; - - private final AtomicInteger sequence = new AtomicInteger(0); private final Map<Integer, ResponseCallback> requests = new ConcurrentHashMap<Integer, ResponseCallback>(); - private final Class<?> protocol; private final Method stubMethod; - - private RpcConnectionKey key; + private final ProxyRpcChannel rpcChannel; + private final ClientChannelInboundHandler inboundHandler; /** * Intentionally make this method package-private, avoiding user directly * new an instance through this constructor. */ - AsyncRpcClient(final Class<?> protocol, - final InetSocketAddress addr, int retries) - throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException { - - this.protocol = protocol; - String serviceClassName = protocol.getName() + "$" - + protocol.getSimpleName() + "Service"; - Class<?> serviceClass = Class.forName(serviceClassName); - stubMethod = serviceClass.getMethod("newStub", RpcChannel.class); - - initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(), - RpcResponse.getDefaultInstance()); - super.init(addr, initializer, retries); + AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries) + throws ClassNotFoundException, NoSuchMethodException { + super(rpcConnectionKey, retries); + stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class); rpcChannel = new ProxyRpcChannel(); - this.key = new RpcConnectionKey(addr, protocol, true); - } - - @Override - public RpcConnectionKey getKey() { - return key; + inboundHandler = new ClientChannelInboundHandler(); + init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance())); } @Override public <T> T getStub() { - try { - return (T) stubMethod.invoke(null, rpcChannel); - } catch (Exception e) { - throw new RemoteException(e.getMessage(), e); - } - } - - public RpcChannel getRpcChannel() { - return this.rpcChannel; + return getStub(stubMethod, rpcChannel); } protected void sendExceptions(String message) { @@ -113,17 +85,6 @@ public class AsyncRpcClient extends NettyClientBase { } private class ProxyRpcChannel implements RpcChannel { - private final ClientChannelInboundHandler handler; - - public ProxyRpcChannel() { - this.handler = getChannel().pipeline() - .get(ClientChannelInboundHandler.class); - - if (handler == null) { - throw new IllegalArgumentException("Channel does not have " + - "proper handler"); - } - } public void callMethod(final MethodDescriptor method, final RpcController controller, @@ -135,7 +96,7 @@ public class AsyncRpcClient extends NettyClientBase { Message rpcRequest = buildRequest(nextSeqId, method, param); - handler.registerCallback(nextSeqId, + inboundHandler.registerCallback(nextSeqId, new ResponseCallback(controller, responseType, done)); ChannelPromise channelPromise = getChannel().newPromise(); @@ -144,7 +105,7 @@ public class AsyncRpcClient extends NettyClientBase { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - handler.exceptionCaught(null, new ServiceException(future.cause())); + inboundHandler.exceptionCaught(null, new ServiceException(future.cause())); } } }); http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java index 4ec5718..6a90330 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java @@ -25,6 +25,7 @@ import io.netty.channel.*; import io.netty.util.concurrent.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; @@ -35,63 +36,33 @@ import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; public class BlockingRpcClient extends NettyClientBase { private static final Log LOG = LogFactory.getLog(RpcProtos.class); - private final ChannelInitializer<Channel> initializer; - private final ProxyRpcChannel rpcChannel; - - private final AtomicInteger sequence = new AtomicInteger(0); private final Map<Integer, ProtoCallFuture> requests = new ConcurrentHashMap<Integer, ProtoCallFuture>(); - private final Class<?> protocol; private final Method stubMethod; - - private RpcConnectionKey key; + private final ProxyRpcChannel rpcChannel; + private final ChannelInboundHandlerAdapter inboundHandler; /** * Intentionally make this method package-private, avoiding user directly * new an instance through this constructor. */ - BlockingRpcClient(final Class<?> protocol, - final InetSocketAddress addr, int retries) - throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException { - - this.protocol = protocol; - String serviceClassName = protocol.getName() + "$" - + protocol.getSimpleName() + "Service"; - Class<?> serviceClass = Class.forName(serviceClassName); - stubMethod = serviceClass.getMethod("newBlockingStub", - BlockingRpcChannel.class); - - initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(), RpcResponse.getDefaultInstance()); - super.init(addr, initializer, retries); + BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries) + throws ClassNotFoundException, NoSuchMethodException { + super(rpcConnectionKey, retries); + stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class); rpcChannel = new ProxyRpcChannel(); - - this.key = new RpcConnectionKey(addr, protocol, false); - } - - @Override - public RpcConnectionKey getKey() { - return key; + inboundHandler = new ClientChannelInboundHandler(); + init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance())); } @Override public <T> T getStub() { - try { - return (T) stubMethod.invoke(null, rpcChannel); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - } - - public BlockingRpcChannel getBlockingRpcChannel() { - return this.rpcChannel; + return getStub(stubMethod, rpcChannel); } @Override @@ -106,19 +77,6 @@ public class BlockingRpcClient extends NettyClientBase { private class ProxyRpcChannel implements BlockingRpcChannel { - private final ClientChannelInboundHandler handler; - - public ProxyRpcChannel() { - - this.handler = getChannel().pipeline(). - get(ClientChannelInboundHandler.class); - - if (handler == null) { - throw new IllegalArgumentException("Channel does not have " + - "proper handler"); - } - } - @Override public Message callBlockingMethod(final MethodDescriptor method, final RpcController controller, @@ -139,7 +97,7 @@ public class BlockingRpcClient extends NettyClientBase { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - handler.exceptionCaught(null, new ServiceException(future.cause())); + inboundHandler.exceptionCaught(null, new ServiceException(future.cause())); } } }); @@ -174,7 +132,7 @@ public class BlockingRpcClient extends NettyClientBase { } private String getErrorMessage(String message) { - if(protocol != null && getChannel() != null) { + if(getChannel() != null) { return protocol.getName() + "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) getChannel().remoteAddress()) + "): " + message; @@ -184,7 +142,7 @@ public class BlockingRpcClient extends NettyClientBase { } private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) { - if(protocol != null && getChannel() != null) { + if(getChannel() != null) { return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(), RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress())); } else { http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index 7b52178..7dfc5a2 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -20,9 +20,9 @@ package org.apache.tajo.rpc; import io.netty.channel.*; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; @@ -30,77 +30,125 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.GenericFutureListener; import java.io.Closeable; +import java.lang.reflect.Method; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public abstract class NettyClientBase implements Closeable { - private static Log LOG = LogFactory.getLog(NettyClientBase.class); - private static final int CLIENT_CONNECTION_TIMEOUT_SEC = 60; + private static final Log LOG = LogFactory.getLog(NettyClientBase.class); + private static final int CONNECTION_TIMEOUT = 60000; // 60 sec private static final long PAUSE = 1000; // 1 sec - private int numRetries; - protected Bootstrap bootstrap; - private ChannelFuture channelFuture; + private final int numRetries; - public NettyClientBase() { - } + private Bootstrap bootstrap; + private volatile ChannelFuture channelFuture; - public abstract <T> T getStub(); - public abstract RpcConnectionPool.RpcConnectionKey getKey(); - - public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer, - int numRetries) throws ConnectTimeoutException { + protected final Class<?> protocol; + protected final AtomicInteger sequence = new AtomicInteger(0); + + private final RpcConnectionKey key; + private final AtomicInteger counter = new AtomicInteger(0); // reference counter + + public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries) + throws ClassNotFoundException, NoSuchMethodException { + this.key = rpcConnectionKey; + this.protocol = rpcConnectionKey.protocolClass; this.numRetries = numRetries; - - init(addr, initializer); } - public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer) - throws ConnectTimeoutException { + // should be called from sub class + protected void init(ChannelInitializer<Channel> initializer) { this.bootstrap = new Bootstrap(); this.bootstrap .channel(NioSocketChannel.class) .handler(initializer) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECTION_TIMEOUT) .option(ChannelOption.SO_RCVBUF, 1048576 * 10) .option(ChannelOption.TCP_NODELAY, true); + } - connect(addr); + public RpcConnectionPool.RpcConnectionKey getKey() { + return key; } - private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) { + protected final Class<?> getServiceClass() throws ClassNotFoundException { + String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service"; + return Class.forName(serviceClassName); + } + @SuppressWarnings("unchecked") + protected final <T> T getStub(Method stubMethod, Object rpcChannel) { + try { + return (T) stubMethod.invoke(null, rpcChannel); + } catch (Exception e) { + throw new RemoteException(e.getMessage(), e); + } + } + + public abstract <T> T getStub(); + + public boolean acquire(long timeout) { + if (!checkConnection(timeout)) { + return false; + } + counter.incrementAndGet(); + return true; + } + + public boolean release() { + return counter.decrementAndGet() == 0; + } + + private boolean checkConnection(long timeout) { + if (isConnected()) { + return true; + } + + InetSocketAddress addr = key.addr; + if (addr.isUnresolved()) { + addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort()); + } + + return handleConnectionInternally(addr, timeout); + } + + private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) { + LOG.warn("Try to connect : " + address); this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup()) .connect(address) .addListener(listener); } - - private void handleConnectionInternally(final InetSocketAddress addr) throws ConnectTimeoutException { - final CountDownLatch latch = new CountDownLatch(1); - GenericFutureListener<ChannelFuture> listener = new RetryConnectionListener(addr, latch); - connectUsingNetty(addr, listener); + + // first attendant kicks connection + private final RpcUtils.Scrutineer<CountDownLatch> connect = new RpcUtils.Scrutineer<CountDownLatch>(); + + private boolean handleConnectionInternally(final InetSocketAddress addr, long timeout) { + final CountDownLatch ticket = new CountDownLatch(1); + final CountDownLatch granted = connect.check(ticket); + + if (ticket == granted) { + connectUsingNetty(addr, new RetryConnectionListener(addr, granted)); + } try { - latch.await(CLIENT_CONNECTION_TIMEOUT_SEC, TimeUnit.SECONDS); + granted.await(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { + // ignore } - if (!channelFuture.isSuccess()) { - throw new ConnectTimeoutException("Connect error to " + addr + - " caused by " + ExceptionUtils.getMessage(channelFuture.cause())); - } - } + boolean success = channelFuture.isSuccess(); - public void connect(InetSocketAddress addr) throws ConnectTimeoutException { - if(addr.isUnresolved()){ - addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort()); + if (granted.getCount() == 0) { + connect.clear(granted); } - handleConnectionInternally(addr); + return success; } class RetryConnectionListener implements GenericFutureListener<ChannelFuture> { @@ -142,32 +190,26 @@ public abstract class NettyClientBase implements Closeable { } } - public boolean isActive() { - return getChannel().isActive(); + public Channel getChannel() { + return channelFuture == null ? null : channelFuture.channel(); } - public InetSocketAddress getRemoteAddress() { - if (channelFuture == null || channelFuture.channel() == null) { - return null; - } - return (InetSocketAddress) channelFuture.channel().remoteAddress(); + public boolean isConnected() { + Channel channel = getChannel(); + return channel != null && channel.isOpen() && channel.isActive(); } - public Channel getChannel() { - return channelFuture.channel(); + public SocketAddress getRemoteAddress() { + Channel channel = getChannel(); + return channel == null ? null : channel.remoteAddress(); } @Override public void close() { - if (channelFuture != null && getChannel().isActive()) { - getChannel().close(); - } - - if (this.bootstrap != null) { - InetSocketAddress address = getRemoteAddress(); - if (address != null) { - LOG.debug("Proxy is disconnected from " + address.getHostName() + ":" + address.getPort()); - } + Channel channel = getChannel(); + if (channel != null && channel.isOpen()) { + LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress()); + channel.close(); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java index 43feeb1..6d1f479 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java @@ -18,13 +18,9 @@ package org.apache.tajo.rpc; -import com.google.common.base.Objects; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import io.netty.channel.ConnectTimeoutException; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.group.DefaultChannelGroup; -import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.internal.logging.CommonsLoggerFactory; import io.netty.util.internal.logging.InternalLoggerFactory; @@ -37,7 +33,6 @@ public class RpcConnectionPool { private Map<RpcConnectionKey, NettyClientBase> connections = new HashMap<RpcConnectionKey, NettyClientBase>(); - private ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static RpcConnectionPool instance; private final Object lockObject = new Object(); @@ -59,103 +54,101 @@ public class RpcConnectionPool { throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { NettyClientBase client; if(rpcConnectionKey.asyncMode) { - client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, - RPC_RETRIES); + client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES); } else { - client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, - RPC_RETRIES); + client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES); } - accepted.add(client.getChannel()); return client; } + public static final long DEFAULT_TIMEOUT = 3000; + public static final long DEFAULT_INTERVAL = 500; + public NettyClientBase getConnection(InetSocketAddress addr, Class<?> protocolClass, boolean asyncMode) throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { + return getConnection(addr, protocolClass, asyncMode, DEFAULT_TIMEOUT, DEFAULT_INTERVAL); + } + + public NettyClientBase getConnection(InetSocketAddress addr, + Class<?> protocolClass, boolean asyncMode, long timeout, long interval) + throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode); - NettyClientBase client = connections.get(key); - if (client == null) { - synchronized (lockObject){ + RpcUtils.Timer timer = new RpcUtils.Timer(timeout); + for (; !timer.isTimedOut(); timer.elapsed()) { + NettyClientBase client; + synchronized (lockObject) { client = connections.get(key); if (client == null) { - client = makeConnection(key); - connections.put(key, client); + connections.put(key, client = makeConnection(key)); } } + if (client.acquire(timer.remaining())) { + return client; + } + timer.interval(interval); } - if (client.getChannel() == null || !client.getChannel().isOpen() || !client.getChannel().isActive()) { - LOG.warn("Try to reconnect : " + addr); - client.connect(addr); - } - return client; + throw new ConnectTimeoutException("Failed to get connection for " + timeout + " msec"); } public void releaseConnection(NettyClientBase client) { - if (client == null) return; - - try { - synchronized (lockObject) { - if (!client.getChannel().isOpen()) { - connections.remove(client.getKey()); - client.close(); - } - } - - if(LOG.isDebugEnabled()) { - LOG.debug("Current Connections [" + connections.size() + "] Accepted: " + accepted.size()); - - } - } catch (Exception e) { - LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e); - } + release(client, false); } public void closeConnection(NettyClientBase client) { + release(client, true); + } + + private void release(NettyClientBase client, boolean close) { if (client == null) { return; } - + if (LOG.isDebugEnabled()) { + LOG.debug("Close connection [" + client.getKey() + "]"); + } try { - if(LOG.isDebugEnabled()) { - LOG.debug("Close connection [" + client.getKey() + "]"); - } - - synchronized (lockObject) { - connections.remove(client.getKey()); + if (returnToPool(client, close)) { client.close(); } - + if (LOG.isDebugEnabled()) { + LOG.debug("Current Connections [" + connections.size() + "]"); + } } catch (Exception e) { LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e); } } - public synchronized void close() { + // return true if the connection should be closed + private boolean returnToPool(NettyClientBase client, boolean close) { + synchronized (lockObject) { + if (client.release() && (close || !client.isConnected())) { + connections.remove(client.getKey()); + return true; + } + } + return false; + } + + public void close() { if(LOG.isDebugEnabled()) { LOG.debug("Pool Closed"); } - synchronized(lockObject) { - for(NettyClientBase eachClient: connections.values()) { + + synchronized (lockObject) { + for (NettyClientBase eachClient : connections.values()) { try { eachClient.close(); } catch (Exception e) { LOG.error("close client pool error", e); } } - connections.clear(); } - - try { - accepted.close(); - } catch (Throwable t) { - LOG.error(t, t); - } } - public synchronized void shutdown(){ + public void shutdown(){ close(); RpcChannelFactory.shutdownGracefully(); } @@ -165,16 +158,19 @@ public class RpcConnectionPool { final Class<?> protocolClass; final boolean asyncMode; + final String description; + public RpcConnectionKey(InetSocketAddress addr, Class<?> protocolClass, boolean asyncMode) { this.addr = addr; this.protocolClass = protocolClass; this.asyncMode = asyncMode; + this.description = "["+ protocolClass + "] " + addr + "," + asyncMode; } @Override public String toString() { - return "["+ protocolClass + "] " + addr + "," + asyncMode; + return description; } @Override @@ -188,7 +184,7 @@ public class RpcConnectionPool { @Override public int hashCode() { - return Objects.hashCode(addr, asyncMode); + return description.hashCode(); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java index b6be05f..152d426 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcUtils.java @@ -21,6 +21,7 @@ package org.apache.tajo.rpc; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicReference; public class RpcUtils { @@ -65,4 +66,57 @@ public class RpcUtils { String [] splitted = addr.split(":"); return InetSocketAddress.createUnresolved(splitted[0], Integer.parseInt(splitted[1])); } + + public static class Timer { + private long remaining; + private long prev; + public Timer(long timeout) { + this.remaining = timeout; + this.prev = System.currentTimeMillis(); + } + + public boolean isTimedOut() { + return remaining <= 0; + } + + public void elapsed() { + long current = System.currentTimeMillis(); + remaining -= (prev - current); + prev = current; + } + + public void interval(long wait) { + if (wait <= 0 || isTimedOut()) { + return; + } + try { + Thread.sleep(Math.min(remaining, wait)); + } catch (Exception ex) { + // ignore + } + } + + public long remaining() { + return remaining; + } + } + + public static class Scrutineer<T> { + + private final AtomicReference<T> reference = new AtomicReference<T>(); + + T check(T ticket) { + T granted = reference.get(); + for (;granted == null; granted = reference.get()) { + if (reference.compareAndSet(null, ticket)) { + return ticket; + } + } + return granted; + } + + boolean clear(T granted) { + return reference.compareAndSet(granted, null); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java index 31d5265..a974a65 100644 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java +++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java @@ -34,8 +34,6 @@ import org.junit.rules.ExternalResource; import org.junit.runner.Description; import org.junit.runners.model.Statement; -import io.netty.channel.ConnectTimeoutException; - import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; @@ -125,8 +123,12 @@ public class TestAsyncRpc { public void setUpRpcClient() throws Exception { retries = 1; - client = new AsyncRpcClient(DummyProtocol.class, - RpcUtils.getConnectAddress(server.getListenAddress()), retries); + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey( + RpcUtils.getConnectAddress(server.getListenAddress()), + DummyProtocol.class, true); + client = new AsyncRpcClient(rpcConnectionKey, retries); + client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT); stub = client.getStub(); } @@ -296,7 +298,10 @@ public class TestAsyncRpc { }); serverThread.start(); - client = new AsyncRpcClient(DummyProtocol.class, address, retries); + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true); + client = new AsyncRpcClient(rpcConnectionKey, retries); + assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); stub = client.getStub(); stub.echo(future.getController(), echoMessage, future); @@ -308,24 +313,25 @@ public class TestAsyncRpc { @Test public void testConnectionFailure() throws Exception { InetSocketAddress address = new InetSocketAddress("test", 0); - boolean expected = false; try { - new AsyncRpcClient(DummyProtocol.class, address, retries); - fail(); - } catch (ConnectTimeoutException e) { - expected = true; + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, true); + NettyClientBase client = new AsyncRpcClient(rpcConnectionKey, retries); + assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); } catch (Throwable throwable) { fail(); } - assertTrue(expected); } @Test @SetupRpcConnection(setupRpcClient=false) public void testUnresolvedAddress() throws Exception { String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); - client = new AsyncRpcClient(DummyProtocol.class, - RpcUtils.createUnresolved(hostAndPort), retries); + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey( + RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true); + client = new AsyncRpcClient(rpcConnectionKey, retries); + assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); Interface stub = client.getStub(); EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); http://git-wip-us.apache.org/repos/asf/tajo/blob/0dc7d680/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java index 07e2dca..10dd766 100644 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java +++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java @@ -35,7 +35,6 @@ import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.util.concurrent.CountDownLatch; @@ -116,8 +115,12 @@ public class TestBlockingRpc { public void setUpRpcClient() throws Exception { retries = 1; - client = new BlockingRpcClient(DummyProtocol.class, - RpcUtils.getConnectAddress(server.getListenAddress()), retries); + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey( + RpcUtils.getConnectAddress(server.getListenAddress()), + DummyProtocol.class, false); + client = new BlockingRpcClient(rpcConnectionKey, retries); + assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); stub = client.getStub(); } @@ -238,7 +241,10 @@ public class TestBlockingRpc { }); serverThread.start(); - client = new BlockingRpcClient(DummyProtocol.class, address, retries); + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey(address, DummyProtocol.class, false); + client = new BlockingRpcClient(rpcConnectionKey, retries); + assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); stub = client.getStub(); EchoMessage response = stub.echo(null, message); @@ -247,24 +253,23 @@ public class TestBlockingRpc { @Test public void testConnectionFailed() throws Exception { - boolean expected = false; NettyClientBase client = null; try { int port = server.getListenAddress().getPort() + 1; - client = new BlockingRpcClient(DummyProtocol.class, - RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), retries); + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey( + RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), + DummyProtocol.class, false); + client = new BlockingRpcClient(rpcConnectionKey, retries); + assertFalse(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); client.close(); - fail("Connection should be failed."); - } catch (ConnectException ce) { - expected = true; } catch (Throwable ce){ if (client != null) { client.close(); } fail(); } - assertTrue(expected); } @Test @@ -329,8 +334,11 @@ public class TestBlockingRpc { @SetupRpcConnection(setupRpcClient=false) public void testUnresolvedAddress() throws Exception { String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); - client = new BlockingRpcClient(DummyProtocol.class, - RpcUtils.createUnresolved(hostAndPort), retries); + RpcConnectionPool.RpcConnectionKey rpcConnectionKey = + new RpcConnectionPool.RpcConnectionKey( + RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false); + client = new BlockingRpcClient(rpcConnectionKey, retries); + assertTrue(client.acquire(RpcConnectionPool.DEFAULT_TIMEOUT)); BlockingInterface stub = client.getStub(); EchoMessage message = EchoMessage.newBuilder()
