Repository: tajo Updated Branches: refs/heads/branch-0.10.1 5e1fa93b5 -> 47008c58e
http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java index 4ec5718..61e2f04 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java @@ -18,80 +18,57 @@ package org.apache.tajo.rpc; -import com.google.protobuf.*; +import com.google.protobuf.BlockingRpcChannel; import com.google.protobuf.Descriptors.MethodDescriptor; - +import com.google.protobuf.Message; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; import io.netty.channel.*; -import io.netty.util.concurrent.*; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.concurrent.GenericFutureListener; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; -import io.netty.util.ReferenceCountUtil; - import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.*; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; public class BlockingRpcClient extends NettyClientBase { private static final Log LOG = LogFactory.getLog(RpcProtos.class); - private final ChannelInitializer<Channel> initializer; - private final ProxyRpcChannel rpcChannel; - - private final AtomicInteger sequence = new AtomicInteger(0); private final Map<Integer, ProtoCallFuture> requests = new ConcurrentHashMap<Integer, ProtoCallFuture>(); - private final Class<?> protocol; private final Method stubMethod; - - private RpcConnectionKey key; + private final ProxyRpcChannel rpcChannel; + private final ChannelInboundHandlerAdapter inboundHandler; /** * Intentionally make this method package-private, avoiding user directly * new an instance through this constructor. */ - BlockingRpcClient(final Class<?> protocol, - final InetSocketAddress addr, int retries) - throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException { - - this.protocol = protocol; - String serviceClassName = protocol.getName() + "$" - + protocol.getSimpleName() + "Service"; - Class<?> serviceClass = Class.forName(serviceClassName); - stubMethod = serviceClass.getMethod("newBlockingStub", - BlockingRpcChannel.class); - - initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(), RpcResponse.getDefaultInstance()); - super.init(addr, initializer, retries); - rpcChannel = new ProxyRpcChannel(); - - this.key = new RpcConnectionKey(addr, protocol, false); + BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries) + throws NoSuchMethodException, ClassNotFoundException { + this(rpcConnectionKey, retries, 0); } - @Override - public RpcConnectionKey getKey() { - return key; + BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries, int idleTimeSeconds) + throws ClassNotFoundException, NoSuchMethodException { + super(rpcConnectionKey, retries); + stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class); + rpcChannel = new ProxyRpcChannel(); + inboundHandler = new ClientChannelInboundHandler(); + init(new ProtoChannelInitializer(inboundHandler, RpcResponse.getDefaultInstance(), idleTimeSeconds)); } @Override public <T> T getStub() { - try { - return (T) stubMethod.invoke(null, rpcChannel); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - } - - public BlockingRpcChannel getBlockingRpcChannel() { - return this.rpcChannel; + return getStub(stubMethod, rpcChannel); } @Override @@ -100,25 +77,12 @@ public class BlockingRpcClient extends NettyClientBase { callback.setFailed("BlockingRpcClient terminates all the connections", new ServiceException("BlockingRpcClient terminates all the connections")); } - + requests.clear(); super.close(); } private class ProxyRpcChannel implements BlockingRpcChannel { - private final ClientChannelInboundHandler handler; - - public ProxyRpcChannel() { - - this.handler = getChannel().pipeline(). - get(ClientChannelInboundHandler.class); - - if (handler == null) { - throw new IllegalArgumentException("Channel does not have " + - "proper handler"); - } - } - @Override public Message callBlockingMethod(final MethodDescriptor method, final RpcController controller, @@ -139,7 +103,7 @@ public class BlockingRpcClient extends NettyClientBase { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { - handler.exceptionCaught(null, new ServiceException(future.cause())); + inboundHandler.exceptionCaught(null, new ServiceException(future.cause())); } } }); @@ -174,7 +138,7 @@ public class BlockingRpcClient extends NettyClientBase { } private String getErrorMessage(String message) { - if(protocol != null && getChannel() != null) { + if(getChannel() != null) { return protocol.getName() + "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) getChannel().remoteAddress()) + "): " + message; @@ -184,7 +148,7 @@ public class BlockingRpcClient extends NettyClientBase { } private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) { - if(protocol != null && getChannel() != null) { + if(getChannel() != null) { return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(), RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress())); } else { @@ -193,39 +157,29 @@ public class BlockingRpcClient extends NettyClientBase { } @ChannelHandler.Sharable - private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter { + private class ClientChannelInboundHandler extends SimpleChannelInboundHandler<RpcResponse> { @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - - if (msg instanceof RpcResponse) { - try { - RpcResponse rpcResponse = (RpcResponse) msg; - ProtoCallFuture callback = requests.remove(rpcResponse.getId()); + protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception { + ProtoCallFuture callback = requests.remove(rpcResponse.getId()); - if (callback == null) { - LOG.warn("Dangling rpc call"); + if (callback == null) { + LOG.warn("Dangling rpc call"); + } else { + if (rpcResponse.hasErrorMessage()) { + callback.setFailed(rpcResponse.getErrorMessage(), + makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace()))); + } else { + Message responseMessage; + + if (!rpcResponse.hasResponseMessage()) { + responseMessage = null; } else { - if (rpcResponse.hasErrorMessage()) { - callback.setFailed(rpcResponse.getErrorMessage(), - makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace()))); - throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage())); - } else { - Message responseMessage; - - if (!rpcResponse.hasResponseMessage()) { - responseMessage = null; - } else { - responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage()) - .build(); - } - - callback.setResponse(responseMessage); - } + responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage()) + .build(); } - } finally { - ReferenceCountUtil.release(msg); + + callback.setResponse(responseMessage); } } } @@ -233,22 +187,39 @@ public class BlockingRpcClient extends NettyClientBase { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + /* Current requests will be failed */ for(ProtoCallFuture callback: requests.values()) { callback.setFailed(cause.getMessage(), cause); } - + requests.clear(); + if(LOG.isDebugEnabled()) { LOG.error("" + cause.getMessage(), cause); } else { LOG.error("RPC Exception:" + cause.getMessage()); } - if (ctx != null && ctx.channel().isActive()) { - ctx.channel().close(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + super.channelActive(ctx); + LOG.info("Connection established successfully : " + ctx.channel().remoteAddress()); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent e = (IdleStateEvent) evt; + /* If all requests is done and event is triggered, channel will be closed. */ + if (e.state() == IdleState.ALL_IDLE && requests.size() == 0) { + ctx.close(); + LOG.warn("Idle connection closed successfully :" + ctx.channel().remoteAddress()); + } } } } - static class ProtoCallFuture implements Future<Message> { + static class ProtoCallFuture implements Future<Message> { private Semaphore sem = new Semaphore(0); private Message response = null; private Message returnType; http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java index 0ce359f..bb31367 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java @@ -22,15 +22,12 @@ import com.google.protobuf.BlockingService; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.RpcController; - import io.netty.channel.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; -import io.netty.util.ReferenceCountUtil; - import java.lang.reflect.Method; import java.net.InetSocketAddress; @@ -62,7 +59,7 @@ public class BlockingRpcServer extends NettyServerBase { } @ChannelHandler.Sharable - private class ServerHandler extends ChannelInboundHandlerAdapter { + private class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { @@ -83,52 +80,43 @@ public class BlockingRpcServer extends NettyServerBase { } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { + protected void channelRead0(ChannelHandlerContext ctx, RpcRequest request) throws Exception { - if (msg instanceof RpcRequest) { + String methodName = request.getMethodName(); + MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); + + if (methodDescriptor == null) { + throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); + } + Message paramProto = null; + if (request.hasRequestMessage()) { try { - final RpcRequest request = (RpcRequest) msg; - - String methodName = request.getMethodName(); - MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); - - if (methodDescriptor == null) { - throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); - } - Message paramProto = null; - if (request.hasRequestMessage()) { - try { - paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() - .mergeFrom(request.getRequestMessage()).build(); - - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); - } - } - Message returnValue; - RpcController controller = new NettyRpcController(); - - try { - returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto); - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); - } - - RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); - - if (returnValue != null) { - builder.setResponseMessage(returnValue.toByteString()); - } - - if (controller.failed()) { - builder.setErrorMessage(controller.errorText()); - } - ctx.writeAndFlush(builder.build()); - } finally { - ReferenceCountUtil.release(msg); + paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() + .mergeFrom(request.getRequestMessage()).build(); + + } catch (Throwable t) { + throw new RemoteCallException(request.getId(), methodDescriptor, t); } } + Message returnValue; + RpcController controller = new NettyRpcController(); + + try { + returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto); + } catch (Throwable t) { + throw new RemoteCallException(request.getId(), methodDescriptor, t); + } + + RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); + + if (returnValue != null) { + builder.setResponseMessage(returnValue.toByteString()); + } + + if (controller.failed()) { + builder.setErrorMessage(controller.errorText()); + } + ctx.writeAndFlush(builder.build()); } @Override @@ -137,11 +125,6 @@ public class BlockingRpcServer extends NettyServerBase { RemoteCallException callException = (RemoteCallException) cause; ctx.writeAndFlush(callException.getResponse()); } - - if (ctx != null && ctx.channel().isActive()) { - ctx.channel().close(); - } } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java new file mode 100644 index 0000000..29c9772 --- /dev/null +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ConnectionCloseFutureListener.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; + +public class ConnectionCloseFutureListener implements GenericFutureListener { + private RpcClientManager.RpcConnectionKey key; + + public ConnectionCloseFutureListener(RpcClientManager.RpcConnectionKey key) { + this.key = key; + } + + @Override + public void operationComplete(Future future) throws Exception { + RpcClientManager.remove(key); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index 7b52178..a75148b 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -18,156 +18,150 @@ package org.apache.tajo.rpc; -import io.netty.channel.*; - -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.*; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.util.concurrent.GenericFutureListener; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey; import java.io.Closeable; +import java.lang.reflect.Method; import java.net.InetSocketAddress; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.net.SocketAddress; import java.util.concurrent.atomic.AtomicInteger; public abstract class NettyClientBase implements Closeable { - private static Log LOG = LogFactory.getLog(NettyClientBase.class); - private static final int CLIENT_CONNECTION_TIMEOUT_SEC = 60; + private static final Log LOG = LogFactory.getLog(NettyClientBase.class); + private static final int CONNECTION_TIMEOUT = 60000; // 60 sec private static final long PAUSE = 1000; // 1 sec - private int numRetries; - protected Bootstrap bootstrap; - private ChannelFuture channelFuture; + private final int numRetries; - public NettyClientBase() { - } + private Bootstrap bootstrap; + private volatile ChannelFuture channelFuture; - public abstract <T> T getStub(); - public abstract RpcConnectionPool.RpcConnectionKey getKey(); - - public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer, - int numRetries) throws ConnectTimeoutException { + protected final Class<?> protocol; + protected final AtomicInteger sequence = new AtomicInteger(0); + + private final RpcConnectionKey key; + + public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries) + throws ClassNotFoundException, NoSuchMethodException { + this.key = rpcConnectionKey; + this.protocol = rpcConnectionKey.protocolClass; this.numRetries = numRetries; - - init(addr, initializer); } - public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer) - throws ConnectTimeoutException { + // should be called from sub class + protected void init(ChannelInitializer<Channel> initializer) { this.bootstrap = new Bootstrap(); this.bootstrap - .channel(NioSocketChannel.class) - .handler(initializer) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) - .option(ChannelOption.SO_RCVBUF, 1048576 * 10) - .option(ChannelOption.TCP_NODELAY, true); - - connect(addr); + .group(RpcChannelFactory.getSharedClientEventloopGroup()) + .channel(NioSocketChannel.class) + .handler(initializer) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, CONNECTION_TIMEOUT) + .option(ChannelOption.SO_RCVBUF, 1048576 * 10) + .option(ChannelOption.TCP_NODELAY, true); } - private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) { + public RpcClientManager.RpcConnectionKey getKey() { + return key; + } - this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup()) - .connect(address) - .addListener(listener); + protected final Class<?> getServiceClass() throws ClassNotFoundException { + String serviceClassName = protocol.getName() + "$" + protocol.getSimpleName() + "Service"; + return Class.forName(serviceClassName); } - - private void handleConnectionInternally(final InetSocketAddress addr) throws ConnectTimeoutException { - final CountDownLatch latch = new CountDownLatch(1); - GenericFutureListener<ChannelFuture> listener = new RetryConnectionListener(addr, latch); - connectUsingNetty(addr, listener); + @SuppressWarnings("unchecked") + protected final <T> T getStub(Method stubMethod, Object rpcChannel) { try { - latch.await(CLIENT_CONNECTION_TIMEOUT_SEC, TimeUnit.SECONDS); - } catch (InterruptedException e) { - } - - if (!channelFuture.isSuccess()) { - throw new ConnectTimeoutException("Connect error to " + addr + - " caused by " + ExceptionUtils.getMessage(channelFuture.cause())); + return (T) stubMethod.invoke(null, rpcChannel); + } catch (Exception e) { + throw new RemoteException(e.getMessage(), e); } } - public void connect(InetSocketAddress addr) throws ConnectTimeoutException { - if(addr.isUnresolved()){ - addr = RpcUtils.createSocketAddr(addr.getHostName(), addr.getPort()); + public abstract <T> T getStub(); + + + private InetSocketAddress resolveAddress(InetSocketAddress address) { + if (address.isUnresolved()) { + return RpcUtils.createSocketAddr(address.getHostName(), address.getPort()); } + return address; + } - handleConnectionInternally(addr); + private ChannelFuture doConnect(SocketAddress address) { + return this.channelFuture = bootstrap.clone().connect(address); } - class RetryConnectionListener implements GenericFutureListener<ChannelFuture> { - private final AtomicInteger retryCount = new AtomicInteger(); - private final InetSocketAddress address; - private final CountDownLatch latch; - RetryConnectionListener(InetSocketAddress address, CountDownLatch latch) { - this.address = address; - this.latch = latch; + public synchronized void connect() throws ConnectTimeoutException { + if (isConnected()) return; + + final AtomicInteger retries = new AtomicInteger(); + InetSocketAddress address = key.addr; + if (address.isUnresolved()) { + address = resolveAddress(address); } - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - if (!channelFuture.isSuccess()) { - channelFuture.channel().close(); + /* do not call await() inside handler */ + ChannelFuture f = doConnect(address).awaitUninterruptibly(); + retries.incrementAndGet(); + + if (!f.isSuccess() && numRetries > 0) { + doReconnect(address, f, retries); + } + } - if (numRetries > retryCount.getAndIncrement()) { - final GenericFutureListener<ChannelFuture> currentListener = this; + private void doReconnect(final InetSocketAddress address, ChannelFuture future, AtomicInteger retries) + throws ConnectTimeoutException { - RpcChannelFactory.getSharedClientEventloopGroup().schedule(new Runnable() { - @Override - public void run() { - connectUsingNetty(address, currentListener); - } - }, PAUSE, TimeUnit.MILLISECONDS); + for (; ; ) { + if (numRetries >= retries.getAndIncrement()) { - LOG.debug("Connecting to " + address + " has been failed. Retrying to connect."); + LOG.warn(future.cause().getMessage() + " Try to reconnect"); + try { + Thread.sleep(PAUSE); + } catch (InterruptedException e) { } - else { - latch.countDown(); - LOG.error("Max retry count has been exceeded. attempts=" + numRetries); + this.channelFuture = doConnect(address).awaitUninterruptibly(); + if (this.channelFuture.isDone() && this.channelFuture.isSuccess()) { + break; } - } - else { - latch.countDown(); + } else { + throw new ConnectTimeoutException("Max retry count has been exceeded. attempts=" + numRetries + + " caused by: " + future.cause()); } } } - public boolean isActive() { - return getChannel().isActive(); + public Channel getChannel() { + return channelFuture == null ? null : channelFuture.channel(); } - public InetSocketAddress getRemoteAddress() { - if (channelFuture == null || channelFuture.channel() == null) { - return null; - } - return (InetSocketAddress) channelFuture.channel().remoteAddress(); + public boolean isConnected() { + Channel channel = getChannel(); + return channel != null && channel.isOpen() && channel.isActive(); } - public Channel getChannel() { - return channelFuture.channel(); + public SocketAddress getRemoteAddress() { + Channel channel = getChannel(); + return channel == null ? null : channel.remoteAddress(); } @Override public void close() { - if (channelFuture != null && getChannel().isActive()) { - getChannel().close(); - } - - if (this.bootstrap != null) { - InetSocketAddress address = getRemoteAddress(); - if (address != null) { - LOG.debug("Proxy is disconnected from " + address.getHostName() + ":" + address.getPort()); - } + Channel channel = getChannel(); + if (channel != null && channel.isOpen()) { + LOG.debug("Proxy will be disconnected from remote " + channel.remoteAddress()); + channel.close().awaitUninterruptibly(); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java index 6a340dc..74eb650 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java @@ -18,6 +18,7 @@ package org.apache.tajo.rpc; +import com.google.protobuf.MessageLite; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; @@ -26,16 +27,21 @@ import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; - -import com.google.protobuf.MessageLite; +import io.netty.handler.timeout.IdleStateHandler; class ProtoChannelInitializer extends ChannelInitializer<Channel> { private final MessageLite defaultInstance; private final ChannelHandler handler; + private final int idleTimeSeconds; public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) { + this(handler, defaultInstance, 0); + } + + public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance, int idleTimeSeconds) { this.handler = handler; this.defaultInstance = defaultInstance; + this.idleTimeSeconds = idleTimeSeconds; } @Override @@ -45,6 +51,7 @@ class ProtoChannelInitializer extends ChannelInitializer<Channel> { pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance)); pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast("protobufEncoder", new ProtobufEncoder()); + pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTimeSeconds)); //zero is disabling pipeline.addLast("handler", handler); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcClientManager.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcClientManager.java new file mode 100644 index 0000000..f05fb97 --- /dev/null +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcClientManager.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import io.netty.channel.ConnectTimeoutException; +import io.netty.util.internal.logging.CommonsLoggerFactory; +import io.netty.util.internal.logging.InternalLoggerFactory; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.annotation.concurrent.ThreadSafe; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@ThreadSafe +public class RpcClientManager { + private static final Log LOG = LogFactory.getLog(RpcClientManager.class); + + public static final int RPC_RETRIES = 3; + + /* If all requests is done and client is idle state, client will be removed. */ + public static final int RPC_IDLE_TIMEOUT = 43200; // 12 hour + + /* entries will be removed by ConnectionCloseFutureListener */ + private static final Map<RpcConnectionKey, NettyClientBase> + clients = Collections.synchronizedMap(new HashMap<RpcConnectionKey, NettyClientBase>()); + + private static RpcClientManager instance; + + static { + InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory()); + instance = new RpcClientManager(); + } + + private RpcClientManager() { + } + + public static RpcClientManager getInstance() { + return instance; + } + + private NettyClientBase makeClient(RpcConnectionKey rpcConnectionKey) + throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { + NettyClientBase client; + if (rpcConnectionKey.asyncMode) { + client = new AsyncRpcClient(rpcConnectionKey, RPC_RETRIES, RPC_IDLE_TIMEOUT); + } else { + client = new BlockingRpcClient(rpcConnectionKey, RPC_RETRIES, RPC_IDLE_TIMEOUT); + } + return client; + } + + /** + * Connect a {@link NettyClientBase} to the remote {@link NettyServerBase}, and returns rpc client by protocol. + * This client will be shared per protocol and address. Client is removed in shared map when a client is closed + * @param addr + * @param protocolClass + * @param asyncMode + * @return + * @throws NoSuchMethodException + * @throws ClassNotFoundException + * @throws ConnectTimeoutException + */ + public NettyClientBase getClient(InetSocketAddress addr, + Class<?> protocolClass, boolean asyncMode) + throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { + RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode); + + NettyClientBase client; + synchronized (clients) { + client = clients.get(key); + if (client == null) { + clients.put(key, client = makeClient(key)); + } + } + + if (!client.isConnected()) { + client.connect(); + client.getChannel().closeFuture().addListener(new ConnectionCloseFutureListener(key)); + } + assert client.isConnected(); + return client; + } + + /** + * Request to close this clients + * After it is closed, it is removed from clients map. + */ + public static void close() { + LOG.info("Closing RPC client manager"); + + for (NettyClientBase eachClient : clients.values()) { + try { + eachClient.close(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + } + + /** + * Close client manager and shutdown Netty RPC worker pool + * After it is shutdown it is not possible to reuse it again. + */ + public static void shutdown() { + close(); + RpcChannelFactory.shutdownGracefully(); + } + + protected static NettyClientBase remove(RpcConnectionKey key) { + LOG.debug("Removing shared rpc client :" + key); + return clients.remove(key); + } + + protected static boolean contains(RpcConnectionKey key) { + return clients.containsKey(key); + } + + public static void cleanup(NettyClientBase... clients) { + for (NettyClientBase client : clients) { + if (client != null) { + try { + client.close(); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Exception in closing " + client.getKey(), e); + } + } + } + } + } + + static class RpcConnectionKey { + final InetSocketAddress addr; + final Class<?> protocolClass; + final boolean asyncMode; + + final String description; + + public RpcConnectionKey(InetSocketAddress addr, + Class<?> protocolClass, boolean asyncMode) { + this.addr = addr; + this.protocolClass = protocolClass; + this.asyncMode = asyncMode; + this.description = "[" + protocolClass + "] " + addr + "," + asyncMode; + } + + @Override + public String toString() { + return description; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof RpcConnectionKey)) { + return false; + } + + return toString().equals(obj.toString()); + } + + @Override + public int hashCode() { + return description.hashCode(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java deleted file mode 100644 index 43feeb1..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.common.base.Objects; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import io.netty.channel.ConnectTimeoutException; -import io.netty.channel.group.ChannelGroup; -import io.netty.channel.group.DefaultChannelGroup; -import io.netty.util.concurrent.GlobalEventExecutor; -import io.netty.util.internal.logging.CommonsLoggerFactory; -import io.netty.util.internal.logging.InternalLoggerFactory; - -import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.Map; - -public class RpcConnectionPool { - private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class); - - private Map<RpcConnectionKey, NettyClientBase> connections = - new HashMap<RpcConnectionKey, NettyClientBase>(); - private ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - - private static RpcConnectionPool instance; - private final Object lockObject = new Object(); - - public final static int RPC_RETRIES = 3; - - private RpcConnectionPool() { - } - - public synchronized static RpcConnectionPool getPool() { - if(instance == null) { - InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory()); - instance = new RpcConnectionPool(); - } - return instance; - } - - private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey) - throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { - NettyClientBase client; - if(rpcConnectionKey.asyncMode) { - client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, - RPC_RETRIES); - } else { - client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, - RPC_RETRIES); - } - accepted.add(client.getChannel()); - return client; - } - - public NettyClientBase getConnection(InetSocketAddress addr, - Class<?> protocolClass, boolean asyncMode) - throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { - RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode); - NettyClientBase client = connections.get(key); - - if (client == null) { - synchronized (lockObject){ - client = connections.get(key); - if (client == null) { - client = makeConnection(key); - connections.put(key, client); - } - } - } - - if (client.getChannel() == null || !client.getChannel().isOpen() || !client.getChannel().isActive()) { - LOG.warn("Try to reconnect : " + addr); - client.connect(addr); - } - return client; - } - - public void releaseConnection(NettyClientBase client) { - if (client == null) return; - - try { - synchronized (lockObject) { - if (!client.getChannel().isOpen()) { - connections.remove(client.getKey()); - client.close(); - } - } - - if(LOG.isDebugEnabled()) { - LOG.debug("Current Connections [" + connections.size() + "] Accepted: " + accepted.size()); - - } - } catch (Exception e) { - LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e); - } - } - - public void closeConnection(NettyClientBase client) { - if (client == null) { - return; - } - - try { - if(LOG.isDebugEnabled()) { - LOG.debug("Close connection [" + client.getKey() + "]"); - } - - synchronized (lockObject) { - connections.remove(client.getKey()); - client.close(); - } - - } catch (Exception e) { - LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e); - } - } - - public synchronized void close() { - if(LOG.isDebugEnabled()) { - LOG.debug("Pool Closed"); - } - synchronized(lockObject) { - for(NettyClientBase eachClient: connections.values()) { - try { - eachClient.close(); - } catch (Exception e) { - LOG.error("close client pool error", e); - } - } - - connections.clear(); - } - - try { - accepted.close(); - } catch (Throwable t) { - LOG.error(t, t); - } - } - - public synchronized void shutdown(){ - close(); - RpcChannelFactory.shutdownGracefully(); - } - - static class RpcConnectionKey { - final InetSocketAddress addr; - final Class<?> protocolClass; - final boolean asyncMode; - - public RpcConnectionKey(InetSocketAddress addr, - Class<?> protocolClass, boolean asyncMode) { - this.addr = addr; - this.protocolClass = protocolClass; - this.asyncMode = asyncMode; - } - - @Override - public String toString() { - return "["+ protocolClass + "] " + addr + "," + asyncMode; - } - - @Override - public boolean equals(Object obj) { - if(!(obj instanceof RpcConnectionKey)) { - return false; - } - - return toString().equals(obj.toString()); - } - - @Override - public int hashCode() { - return Objects.hashCode(addr, asyncMode); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java index fb1cec2..2804a03 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java @@ -18,13 +18,11 @@ package org.apache.tajo.rpc; +import com.google.protobuf.ServiceException; + import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; - -import com.google.protobuf.ServiceException; public abstract class ServerCallable<T> { protected InetSocketAddress addr; @@ -33,21 +31,16 @@ public abstract class ServerCallable<T> { protected Class<?> protocol; protected boolean asyncMode; protected boolean closeConn; - protected RpcConnectionPool connPool; + protected RpcClientManager manager; public abstract T call(NettyClientBase client) throws Exception; - public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol, boolean asyncMode) { - this(connPool, addr, protocol, asyncMode, false); - } - - public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol, - boolean asyncMode, boolean closeConn) { - this.connPool = connPool; + public ServerCallable(RpcClientManager manager, InetSocketAddress addr, Class<?> protocol, + boolean asyncMode) { + this.manager = manager; this.addr = addr; this.protocol = protocol; this.asyncMode = asyncMode; - this.closeConn = closeConn; } public void beforeCall() { @@ -74,26 +67,24 @@ public abstract class ServerCallable<T> { * Run this instance with retries, timed waits, * and refinds of missing regions. * - * @param <T> the type of the return value * @return an object of type T * @throws com.google.protobuf.ServiceException if a remote or network exception occurs */ + public T withRetries() throws ServiceException { //TODO configurable final long pause = 500; //ms final int numRetries = 3; - List<Throwable> exceptions = new ArrayList<Throwable>(); for (int tries = 0; tries < numRetries; tries++) { NettyClientBase client = null; try { beforeCall(); if(addr != null) { - client = connPool.getConnection(addr, protocol, asyncMode); + client = manager.getClient(addr, protocol, asyncMode); } return call(client); } catch (IOException ioe) { - exceptions.add(ioe); if(abort) { throw new ServiceException(ioe.getMessage(), ioe); } @@ -105,9 +96,7 @@ public abstract class ServerCallable<T> { } finally { afterCall(); if(closeConn) { - connPool.closeConnection(client); - } else { - connPool.releaseConnection(client); + RpcClientManager.cleanup(client); } } try { @@ -122,7 +111,6 @@ public abstract class ServerCallable<T> { /** * Run this instance against the server once. - * @param <T> the type of the return value * @return an object of type T * @throws java.io.IOException if a remote or network exception occurs * @throws RuntimeException other unspecified error @@ -131,7 +119,7 @@ public abstract class ServerCallable<T> { NettyClientBase client = null; try { beforeCall(); - client = connPool.getConnection(addr, protocol, asyncMode); + client = manager.getClient(addr, protocol, asyncMode); return call(client); } catch (Throwable t) { Throwable t2 = translateException(t); @@ -143,9 +131,7 @@ public abstract class ServerCallable<T> { } finally { afterCall(); if(closeConn) { - connPool.closeConnection(client); - } else { - connPool.releaseConnection(client); + RpcClientManager.cleanup(client); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java index 31d5265..1e4959b 100644 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java +++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java @@ -19,6 +19,7 @@ package org.apache.tajo.rpc; import com.google.protobuf.RpcCallback; +import io.netty.channel.ConnectTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.rpc.test.DummyProtocol; @@ -34,8 +35,6 @@ import org.junit.rules.ExternalResource; import org.junit.runner.Description; import org.junit.runners.model.Statement; -import io.netty.channel.ConnectTimeoutException; - import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; @@ -60,17 +59,17 @@ public class TestAsyncRpc { Interface stub; DummyProtocolAsyncImpl service; int retries; - + @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) @interface SetupRpcConnection { boolean setupRpcServer() default true; boolean setupRpcClient() default true; } - + @Rule public ExternalResource resource = new ExternalResource() { - + private Description description; @Override @@ -86,7 +85,7 @@ public class TestAsyncRpc { if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { setUpRpcServer(); } - + if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { setUpRpcClient(); } @@ -103,7 +102,7 @@ public class TestAsyncRpc { fail(e.getMessage()); } } - + if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { try { tearDownRpcServer(); @@ -112,21 +111,25 @@ public class TestAsyncRpc { } } } - + }; - + public void setUpRpcServer() throws Exception { service = new DummyProtocolAsyncImpl(); server = new AsyncRpcServer(DummyProtocol.class, service, new InetSocketAddress("127.0.0.1", 0), 2); server.start(); } - + public void setUpRpcClient() throws Exception { retries = 1; - client = new AsyncRpcClient(DummyProtocol.class, - RpcUtils.getConnectAddress(server.getListenAddress()), retries); + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey( + RpcUtils.getConnectAddress(server.getListenAddress()), + DummyProtocol.class, true); + client = new AsyncRpcClient(rpcConnectionKey, retries); + client.connect(); stub = client.getStub(); } @@ -134,14 +137,14 @@ public class TestAsyncRpc { public static void tearDownClass() throws Exception { RpcChannelFactory.shutdownGracefully(); } - + public void tearDownRpcServer() throws Exception { if(server != null) { server.shutdown(); server = null; } } - + public void tearDownRpcClient() throws Exception { if(client != null) { client.close(); @@ -296,7 +299,11 @@ public class TestAsyncRpc { }); serverThread.start(); - client = new AsyncRpcClient(DummyProtocol.class, address, retries); + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true); + client = new AsyncRpcClient(rpcConnectionKey, retries); + client.connect(); + assertTrue(client.isConnected()); stub = client.getStub(); stub.echo(future.getController(), echoMessage, future); @@ -310,7 +317,10 @@ public class TestAsyncRpc { InetSocketAddress address = new InetSocketAddress("test", 0); boolean expected = false; try { - new AsyncRpcClient(DummyProtocol.class, address, retries); + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true); + NettyClientBase client = new AsyncRpcClient(rpcConnectionKey, retries); + client.connect(); fail(); } catch (ConnectTimeoutException e) { expected = true; @@ -318,14 +328,19 @@ public class TestAsyncRpc { fail(); } assertTrue(expected); + } @Test @SetupRpcConnection(setupRpcClient=false) public void testUnresolvedAddress() throws Exception { String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); - client = new AsyncRpcClient(DummyProtocol.class, - RpcUtils.createUnresolved(hostAndPort), retries); + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey( + RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true); + client = new AsyncRpcClient(rpcConnectionKey, retries); + client.connect(); + assertTrue(client.isConnected()); Interface stub = client.getStub(); EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); @@ -336,4 +351,43 @@ public class TestAsyncRpc { assertEquals(future.get(), echoMessage); assertTrue(future.isDone()); } + + @Test + public void testIdleTimeout() throws Exception { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout + client.connect(); + assertTrue(client.isConnected()); + + Thread.sleep(2000); + assertFalse(client.isConnected()); + + client.connect(); // try to reconnect + assertTrue(client.isConnected()); + client.close(); + assertFalse(client.isConnected()); + } + + @Test + public void testIdleTimeoutWithActiveRequest() throws Exception { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout + client.connect(); + + assertTrue(client.isConnected()); + Interface stub = client.getStub(); + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); + stub.deley(null, echoMessage, future); //3 sec delay + + assertFalse(future.isDone()); + assertEquals(future.get(), echoMessage); + assertTrue(future.isDone()); + + Thread.sleep(2000); + assertFalse(client.isConnected()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java index 07e2dca..8c0b475 100644 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java +++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java @@ -18,6 +18,7 @@ package org.apache.tajo.rpc; +import io.netty.channel.ConnectTimeoutException; import org.apache.tajo.rpc.test.DummyProtocol; import org.apache.tajo.rpc.test.DummyProtocol.DummyProtocolService.BlockingInterface; import org.apache.tajo.rpc.test.TestProtos.EchoMessage; @@ -35,7 +36,6 @@ import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.util.concurrent.CountDownLatch; @@ -51,17 +51,17 @@ public class TestBlockingRpc { private BlockingInterface stub; private DummyProtocolBlockingImpl service; private int retries; - + @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) @interface SetupRpcConnection { boolean setupRpcServer() default true; boolean setupRpcClient() default true; } - + @Rule public ExternalResource resource = new ExternalResource() { - + private Description description; @Override @@ -73,11 +73,11 @@ public class TestBlockingRpc { @Override protected void before() throws Throwable { SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); - + if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { setUpRpcServer(); } - + if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { setUpRpcClient(); } @@ -86,7 +86,7 @@ public class TestBlockingRpc { @Override protected void after() { SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); - + if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { try { tearDownRpcClient(); @@ -94,7 +94,7 @@ public class TestBlockingRpc { fail(e.getMessage()); } } - + if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { try { tearDownRpcServer(); @@ -103,21 +103,26 @@ public class TestBlockingRpc { } } } - + }; - + public void setUpRpcServer() throws Exception { service = new DummyProtocolBlockingImpl(); server = new BlockingRpcServer(DummyProtocol.class, service, new InetSocketAddress("127.0.0.1", 0), 2); server.start(); } - + public void setUpRpcClient() throws Exception { retries = 1; - client = new BlockingRpcClient(DummyProtocol.class, - RpcUtils.getConnectAddress(server.getListenAddress()), retries); + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey( + RpcUtils.getConnectAddress(server.getListenAddress()), + DummyProtocol.class, false); + client = new BlockingRpcClient(rpcConnectionKey, retries); + client.connect(); + assertTrue(client.isConnected()); stub = client.getStub(); } @@ -125,14 +130,14 @@ public class TestBlockingRpc { public static void tearDownClass() throws Exception { RpcChannelFactory.shutdownGracefully(); } - + public void tearDownRpcServer() throws Exception { if(server != null) { server.shutdown(); server = null; } } - + public void tearDownRpcClient() throws Exception { if(client != null) { client.close(); @@ -159,7 +164,7 @@ public class TestBlockingRpc { @Test @SetupRpcConnection(setupRpcClient=false) public void testRpcWithServiceCallable() throws Exception { - RpcConnectionPool pool = RpcConnectionPool.getPool(); + RpcClientManager manager = RpcClientManager.getInstance(); final SumRequest request = SumRequest.newBuilder() .setX1(1) .setX2(2) @@ -167,20 +172,20 @@ public class TestBlockingRpc { .setX4(2.0f).build(); SumResponse response = - new ServerCallable<SumResponse>(pool, - server.getListenAddress(), DummyProtocol.class, false) { - @Override - public SumResponse call(NettyClientBase client) throws Exception { - BlockingInterface stub2 = client.getStub(); - SumResponse response1 = stub2.sum(null, request); - return response1; - } - }.withRetries(); + new ServerCallable<SumResponse>(manager, + server.getListenAddress(), DummyProtocol.class, false) { + @Override + public SumResponse call(NettyClientBase client) throws Exception { + BlockingInterface stub2 = client.getStub(); + SumResponse response1 = stub2.sum(null, request); + return response1; + } + }.withRetries(); assertEquals(8.15d, response.getResult(), 1e-15); response = - new ServerCallable<SumResponse>(pool, + new ServerCallable<SumResponse>(manager, server.getListenAddress(), DummyProtocol.class, false) { @Override public SumResponse call(NettyClientBase client) throws Exception { @@ -191,7 +196,7 @@ public class TestBlockingRpc { }.withoutRetries(); assertTrue(8.15d == response.getResult()); - pool.close(); + RpcClientManager.close(); } @Test @@ -213,6 +218,22 @@ public class TestBlockingRpc { } @Test + public void testThrowException2() throws Exception { + EchoMessage message = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + + try { + stub.throwException(null, message); + fail("RpcCall should throw exception"); + } catch (Throwable t) { + assertTrue(t instanceof TajoServiceException); + } + + EchoMessage message1 = stub.deley(null, message); + assertEquals(message, message1); + } + + @Test @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false) public void testConnectionRetry() throws Exception { retries = 10; @@ -238,7 +259,11 @@ public class TestBlockingRpc { }); serverThread.start(); - client = new BlockingRpcClient(DummyProtocol.class, address, retries); + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, false); + client = new BlockingRpcClient(rpcConnectionKey, retries); + client.connect(); + assertTrue(client.isConnected()); stub = client.getStub(); EchoMessage response = stub.echo(null, message); @@ -247,22 +272,21 @@ public class TestBlockingRpc { @Test public void testConnectionFailed() throws Exception { - boolean expected = false; NettyClientBase client = null; - + boolean expected = false; try { int port = server.getListenAddress().getPort() + 1; - client = new BlockingRpcClient(DummyProtocol.class, - RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), retries); - client.close(); - fail("Connection should be failed."); - } catch (ConnectException ce) { - expected = true; - } catch (Throwable ce){ - if (client != null) { - client.close(); - } + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey( + RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), + DummyProtocol.class, false); + client = new BlockingRpcClient(rpcConnectionKey, retries); + client.connect(); fail(); + } catch (ConnectTimeoutException e) { + expected = true; + } catch (Throwable e) { + fail(e.getMessage()); } assertTrue(expected); } @@ -329,8 +353,12 @@ public class TestBlockingRpc { @SetupRpcConnection(setupRpcClient=false) public void testUnresolvedAddress() throws Exception { String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); - client = new BlockingRpcClient(DummyProtocol.class, - RpcUtils.createUnresolved(hostAndPort), retries); + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey( + RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false); + client = new BlockingRpcClient(rpcConnectionKey, retries); + client.connect(); + assertTrue(client.isConnected()); BlockingInterface stub = client.getStub(); EchoMessage message = EchoMessage.newBuilder() @@ -338,4 +366,41 @@ public class TestBlockingRpc { EchoMessage response2 = stub.echo(null, message); assertEquals(MESSAGE, response2.getMessage()); } + + @Test + public void testIdleTimeout() throws Exception { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); + BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout + client.connect(); + assertTrue(client.isConnected()); + + Thread.sleep(2000); + assertFalse(client.isConnected()); + + client.connect(); // try to reconnect + assertTrue(client.isConnected()); + client.close(); + assertFalse(client.isConnected()); + } + + @Test + public void testIdleTimeoutWithActiveRequest() throws Exception { + RpcClientManager.RpcConnectionKey rpcConnectionKey = + new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); + BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries, 1); //1 sec idle timeout + + client.connect(); + + assertTrue(client.isConnected()); + BlockingInterface stub = client.getStub(); + EchoMessage echoMessage = EchoMessage.newBuilder() + .setMessage(MESSAGE).build(); + + EchoMessage message = stub.deley(null, echoMessage); //3 sec delay + assertEquals(message, echoMessage); + + Thread.sleep(2000); + assertFalse(client.isConnected()); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/47008c58/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java new file mode 100644 index 0000000..5f86518 --- /dev/null +++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import org.apache.tajo.rpc.test.DummyProtocol; +import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestRpcClientManager { + + @Test + public void testRaceCondition() throws Exception { + final int parallelCount = 50; + final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl(); + NettyServerBase server = new AsyncRpcServer(DummyProtocol.class, + service, new InetSocketAddress("127.0.0.1", 0), parallelCount); + server.start(); + + final InetSocketAddress address = server.getListenAddress(); + final RpcClientManager manager = RpcClientManager.getInstance(); + + ExecutorService executor = Executors.newFixedThreadPool(parallelCount); + List<Future> tasks = new ArrayList<Future>(); + for (int i = 0; i < parallelCount; i++) { + tasks.add(executor.submit(new Runnable() { + @Override + public void run() { + NettyClientBase client = null; + try { + client = manager.getClient(address, DummyProtocol.class, false); + } catch (Throwable e) { + fail(e.getMessage()); + } + assertTrue(client.isConnected()); + } + }) + ); + } + + for (Future future : tasks) { + future.get(); + } + + NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false); + RpcClientManager.cleanup(clientBase); + server.shutdown(); + executor.shutdown(); + } + + @Test + public void testCloseFuture() throws Exception { + final DummyProtocolAsyncImpl service = new DummyProtocolAsyncImpl(); + NettyServerBase server = new AsyncRpcServer(DummyProtocol.class, + service, new InetSocketAddress("127.0.0.1", 0), 3); + server.start(); + + final RpcClientManager manager = RpcClientManager.getInstance(); + + NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true); + assertTrue(client.isConnected()); + assertTrue(client.getChannel().isWritable()); + + RpcClientManager.RpcConnectionKey key = client.getKey(); + assertTrue(RpcClientManager.contains(key)); + + client.close(); + assertFalse(RpcClientManager.contains(key)); + server.shutdown(); + } +} \ No newline at end of file
