Repository: tajo Updated Branches: refs/heads/branch-0.11.0 a6490bdde -> 4a22d1da4
http://git-wip-us.apache.org/repos/asf/tajo/blob/4a22d1da/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java index a860d51..f8d5c45 100644 --- a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java +++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java @@ -19,21 +19,32 @@ package org.apache.tajo.jdbc; import com.google.common.collect.Maps; +import io.netty.channel.ConnectTimeoutException; import org.apache.tajo.*; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.client.QueryStatus; +import org.apache.tajo.error.Errors; +import org.apache.tajo.exception.SQLExceptionUtil; +import org.apache.tajo.util.UriUtil; import org.junit.AfterClass; +import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.IOException; import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketTimeoutException; import java.sql.*; import java.util.*; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.error.Errors.ResultCode.CLIENT_CONNECTION_EXCEPTION; +import static org.apache.tajo.error.Errors.ResultCode.CLIENT_UNABLE_TO_ESTABLISH_CONNECTION; import static org.junit.Assert.*; @Category(IntegrationTest.class) @@ -197,7 +208,7 @@ public class TestTajoJdbc extends QueryTestCaseBase { public void testResultSetCompression() throws Exception { String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(), TajoConstants.DEFAULT_DATABASE_NAME); - connUri = connUri + "?" + SessionVars.COMPRESSED_RESULT_TRANSFER.keyname() + "=true"; + connUri = connUri + "?useCompression=true"; Connection conn = DriverManager.getConnection(connUri); assertTrue(conn.isValid(100)); @@ -593,7 +604,7 @@ public class TestTajoJdbc extends QueryTestCaseBase { try { if (!testingCluster.isHiveCatalogStoreRunning()) { String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), - tajoMasterAddress.getPort(), "TestTajoJdbc"); + tajoMasterAddress.getPort(), "TestTajoJdbc"); conn = DriverManager.getConnection(connUri); assertTrue(conn.isValid(100)); http://git-wip-us.apache.org/repos/asf/tajo/blob/4a22d1da/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java ---------------------------------------------------------------------- diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java index 8f84226..1050fc0 100644 --- a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java +++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbcNegative.java @@ -21,7 +21,10 @@ package org.apache.tajo.jdbc; import org.apache.tajo.IntegrationTest; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.error.Errors.ResultCode; +import org.apache.tajo.exception.SQLExceptionUtil; +import org.apache.tajo.util.UriUtil; import org.junit.AfterClass; +import org.junit.Assume; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -29,9 +32,13 @@ import org.junit.experimental.categories.Category; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketTimeoutException; import java.sql.*; +import java.util.Properties; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.error.Errors.ResultCode.CLIENT_CONNECTION_EXCEPTION; import static org.apache.tajo.exception.SQLExceptionUtil.toSQLState; import static org.apache.tajo.jdbc.TestTajoJdbc.buildConnectionUri; import static org.junit.Assert.*; @@ -53,7 +60,7 @@ public class TestTajoJdbcNegative extends QueryTestCaseBase { @Test(expected = SQLException.class) public void testGetConnection() throws SQLException { DriverManager.getConnection("jdbc:taju://" + tajoMasterAddress.getHostName() + ":" + tajoMasterAddress.getPort() - + "/default"); + + "/default"); } @Test @@ -191,4 +198,49 @@ public class TestTajoJdbcNegative extends QueryTestCaseBase { } } } + + private void assumeConnectTimeout(String host, int port, int connectTimeout) throws IOException { + try (Socket socket = new Socket()) { + // Try to connect to a private address in the 10.x.y.z range. + // These addresses are usually not routed, so an attempt to + // connect to them will hang the connection attempt, which is + // what we want to simulate in this test. + socket.connect(new InetSocketAddress(host, port), connectTimeout); + // Abort the test if we can connect. + Assume.assumeTrue(false); + } catch (SocketTimeoutException x) { + // Expected timeout during connect, continue the test. + Assume.assumeTrue(true); + } catch (Throwable x) { + // Abort if any other exception happens. + Assume.assumeTrue(false); + } + } + + @Test(timeout = 5000) + public final void testConnectTimeout() throws Exception { + final String host = "10.255.255.1"; + final int port = 80; + int connectTimeout = 1000; + assumeConnectTimeout(host, port, connectTimeout); + + long startTime = Long.MIN_VALUE; + long endTime; + try { + // artificially cause connection timeout + String connUri = buildConnectionUri(host, port, DEFAULT_DATABASE_NAME); + connUri = UriUtil.addParam(connUri, "connectTimeout", "1"); // 1 seconds + connUri = UriUtil.addParam(connUri, "retry", "0"); // 1 seconds + startTime = System.currentTimeMillis(); + new JdbcConnection(connUri, new Properties()); + fail("Must be failed"); + } catch (SQLException t) { + endTime = System.currentTimeMillis(); + assertEquals(t.getSQLState(), SQLExceptionUtil.toSQLState(CLIENT_CONNECTION_EXCEPTION)); + assertEquals("connection timed out: /10.255.255.1:80", t.getMessage()); + // default is 15 seconds. So, if timeout is shorter than 1~2 seconds. + // We can ensure the parameter was effective. + assertTrue(((endTime - startTime) / 1000) < 2); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4a22d1da/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java index ab0826f..601f3d2 100644 --- a/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java +++ b/tajo-rpc/tajo-rpc-common/src/main/java/org/apache/tajo/rpc/RpcConstants.java @@ -18,15 +18,33 @@ package org.apache.tajo.rpc; +import java.util.concurrent.TimeUnit; + +/** + * Constants for RPC + */ public class RpcConstants { public static final String PING_PACKET = "TAJO"; - public static final String RPC_CLIENT_RETRY_MAX = "tajo.rpc.client.retry.max"; - public static final String RPC_CLIENT_TIMEOUT_SECS = "tajo.rpc.client.timeout-secs"; - - public static final int DEFAULT_RPC_RETRIES = 3; - public static final int DEFAULT_RPC_TIMEOUT_SECONDS = 180; - public static final int DEFAULT_CONNECT_TIMEOUT = 20000; // 20 sec public static final int DEFAULT_PAUSE = 1000; // 1 sec - public static final int DEFAULT_FUTURE_TIMEOUT_SECONDS = 10; + public static final int FUTURE_TIMEOUT_SECONDS_DEFAULT = 10; + + /** How many times the connect will retry */ + public static final String CLIENT_RETRY_NUM = "tajo.rpc.client.retry-num"; + public static final int CLIENT_RETRY_NUM_DEFAULT = 0; + + /** Client connection timeout (milliseconds) */ + public static final String CLIENT_CONNECTION_TIMEOUT = "tajo.rpc.client.connection-timeout-ms"; + /** Default client connection timeout 15 seconds */ + public final static long CLIENT_CONNECTION_TIMEOUT_DEFAULT = TimeUnit.SECONDS.toMillis(15); + + /** + * Socket timeout (milliseconds). + */ + public static final String CLIENT_SOCKET_TIMEOUT = "tajo.rpc.client.socket-timeout-ms"; + /** Default socket timeout - 60 seconds */ + public final static long CLIENT_SOCKET_TIMEOUT_DEFAULT = TimeUnit.SECONDS.toMillis(180); + + public static final String CLIENT_HANG_DETECTION = "tajo.rpc.client.hang-detection"; + public final static boolean CLIENT_HANG_DETECTION_DEFAULT = false; } http://git-wip-us.apache.org/repos/asf/tajo/blob/4a22d1da/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java index 6fb62d4..c613bac 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java @@ -18,56 +18,55 @@ package org.apache.tajo.rpc; -import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.*; import io.netty.channel.ChannelHandler; import io.netty.channel.EventLoopGroup; -import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey; import org.apache.tajo.rpc.RpcProtos.RpcResponse; import java.lang.reflect.Method; -import java.util.concurrent.TimeUnit; +import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.tajo.rpc.RpcConstants.*; + public class AsyncRpcClient extends NettyClientBase<AsyncRpcClient.ResponseCallback> { private final Method stubMethod; private final ProxyRpcChannel rpcChannel; private final NettyChannelInboundHandler handler; - @VisibleForTesting - AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries) - throws ClassNotFoundException, NoSuchMethodException { - this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false, NettyUtils.getDefaultEventLoopGroup()); - } /** * Intentionally make this method package-private, avoiding user directly * new an instance through this constructor. * - * @param rpcConnectionKey - * @param retries retry operation number of times - * @param timeout disable ping, it trigger timeout event on idle-state. - * otherwise it is request timeout on active-state - * @param timeUnit TimeUnit - * @param enablePing enable to detect remote peer hangs - * @param eventLoopGroup thread pool of netty's + * @param rpcConnectionKey RpcConnectionKey + * @param eventLoopGroup Thread pool of netty's + * @param rpcParams Rpc connection parameters (see RpcConstants) + * * @throws ClassNotFoundException * @throws NoSuchMethodException */ - AsyncRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit, boolean enablePing, - EventLoopGroup eventLoopGroup) + AsyncRpcClient(EventLoopGroup eventLoopGroup, + RpcConnectionKey rpcConnectionKey, + Properties rpcParams) throws ClassNotFoundException, NoSuchMethodException { - super(rpcConnectionKey, retries); + super(rpcConnectionKey, rpcParams); this.stubMethod = getServiceClass().getMethod("newStub", RpcChannel.class); this.rpcChannel = new ProxyRpcChannel(); this.handler = new ClientChannelInboundHandler(); - init(new ProtoClientChannelInitializer(handler, - RpcResponse.getDefaultInstance(), - timeUnit.toNanos(timeout), - enablePing), eventLoopGroup); + + final long socketTimeoutMills = Long.parseLong( + rpcParams.getProperty(CLIENT_SOCKET_TIMEOUT, String.valueOf(CLIENT_SOCKET_TIMEOUT_DEFAULT))); + + // Enable proactive hang detection + final boolean hangDetectionEnabled = Boolean.parseBoolean( + rpcParams.getProperty(CLIENT_HANG_DETECTION, String.valueOf(CLIENT_HANG_DETECTION_DEFAULT))); + + init(new ProtoClientChannelInitializer(handler, RpcResponse.getDefaultInstance(), socketTimeoutMills, + hangDetectionEnabled), eventLoopGroup); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/4a22d1da/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java index 4327003..35675b4 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java @@ -18,56 +18,57 @@ package org.apache.tajo.rpc; -import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.*; import com.google.protobuf.Descriptors.MethodDescriptor; import io.netty.channel.ChannelHandler; import io.netty.channel.EventLoopGroup; -import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey; import org.apache.tajo.rpc.RpcProtos.RpcResponse; import java.lang.reflect.Method; import java.net.InetSocketAddress; +import java.util.Properties; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.tajo.rpc.RpcConstants.*; + public class BlockingRpcClient extends NettyClientBase<BlockingRpcClient.ProtoCallFuture> { private final Method stubMethod; private final ProxyRpcChannel rpcChannel; private final NettyChannelInboundHandler handler; - @VisibleForTesting - BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries) - throws NoSuchMethodException, ClassNotFoundException { - this(rpcConnectionKey, retries, 0, TimeUnit.NANOSECONDS, false, NettyUtils.getDefaultEventLoopGroup()); - } - /** * Intentionally make this method package-private, avoiding user directly * new an instance through this constructor. * - * @param rpcConnectionKey - * @param retries retry operation number of times - * @param timeout disable ping, it trigger timeout event on idle-state. - * otherwise it is request timeout on active-state - * @param timeUnit TimeUnit - * @param enablePing enable to detect remote peer hangs - * @param eventLoopGroup thread pool of netty's + * @param rpcConnectionKey RpcConnectionKey + * @param eventLoopGroup Thread pool of netty's + * @param rpcParams Rpc connection parameters (see RpcConstants) + * * @throws ClassNotFoundException * @throws NoSuchMethodException + * @see RpcConstants */ - BlockingRpcClient(RpcConnectionKey rpcConnectionKey, int retries, long timeout, TimeUnit timeUnit, boolean enablePing, - EventLoopGroup eventLoopGroup) throws ClassNotFoundException, NoSuchMethodException { - super(rpcConnectionKey, retries); + public BlockingRpcClient(EventLoopGroup eventLoopGroup, + RpcConnectionKey rpcConnectionKey, + Properties rpcParams) + throws ClassNotFoundException, NoSuchMethodException { + super(rpcConnectionKey, rpcParams); this.stubMethod = getServiceClass().getMethod("newBlockingStub", BlockingRpcChannel.class); this.rpcChannel = new ProxyRpcChannel(); this.handler = new ClientChannelInboundHandler(); - init(new ProtoClientChannelInitializer(handler, - RpcResponse.getDefaultInstance(), - timeUnit.toNanos(timeout), - enablePing), eventLoopGroup); + + long socketTimeoutMills = Long.parseLong( + rpcParams.getProperty(CLIENT_SOCKET_TIMEOUT, String.valueOf(CLIENT_SOCKET_TIMEOUT_DEFAULT))); + + // Enable proactive hang detection + final boolean hangDetectionEnabled = Boolean.parseBoolean( + rpcParams.getProperty(CLIENT_HANG_DETECTION, String.valueOf(CLIENT_HANG_DETECTION_DEFAULT))); + + init(new ProtoClientChannelInitializer(handler, RpcResponse.getDefaultInstance(), socketTimeoutMills, + hangDetectionEnabled), eventLoopGroup); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/4a22d1da/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index e5485da..6008c4c 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -18,6 +18,7 @@ package org.apache.tajo.rpc; +import com.google.common.base.Preconditions; import com.google.protobuf.Descriptors; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; @@ -31,7 +32,6 @@ import io.netty.util.concurrent.GenericFutureListener; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.tajo.rpc.RpcClientManager.RpcConnectionKey; import org.apache.tajo.rpc.RpcProtos.RpcResponse; import java.io.Closeable; @@ -41,27 +41,50 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.UnresolvedAddressException; import java.util.Collection; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import static org.apache.tajo.rpc.RpcConstants.*; + public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable { public final static Log LOG = LogFactory.getLog(NettyClientBase.class); - private Bootstrap bootstrap; - private volatile ChannelFuture channelFuture; private final RpcConnectionKey key; - private final int maxRetries; + /** Number to retry for connection and RPC invocation */ + private final int maxRetryNum; + /** Connection Timeout */ + private final long connTimeoutMillis; private boolean enableMonitor; - - private final ConcurrentMap<RpcConnectionKey, ChannelEventListener> channelEventListeners = - new ConcurrentHashMap<>(); + private final ConcurrentMap<RpcConnectionKey, ChannelEventListener> channelEventListeners = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, T> requests = new ConcurrentHashMap<>(); - public NettyClientBase(RpcConnectionKey rpcConnectionKey, int numRetries) + private Bootstrap bootstrap; + private volatile ChannelFuture channelFuture; + + /** + * Constructor of NettyClientBase + * + * @param rpcConnectionKey RpcConnectionKey + * @param rpcParams Rpc connection parameters (see RpcConstants) + * + * @throws ClassNotFoundException + * @throws NoSuchMethodException + * @see RpcConstants + */ + public NettyClientBase(RpcConnectionKey rpcConnectionKey, Properties rpcParams) throws ClassNotFoundException, NoSuchMethodException { this.key = rpcConnectionKey; - this.maxRetries = numRetries; + + this.maxRetryNum = Integer.parseInt( + rpcParams.getProperty(CLIENT_RETRY_NUM, String.valueOf(CLIENT_RETRY_NUM_DEFAULT))); + + this.connTimeoutMillis = Integer.parseInt( + rpcParams.getProperty(CLIENT_CONNECTION_TIMEOUT, String.valueOf(CLIENT_CONNECTION_TIMEOUT_DEFAULT))); + + // Netty only takes integer value range and this is to avoid integer overflow. + Preconditions.checkArgument(this.connTimeoutMillis <= Integer.MAX_VALUE, "Too long connection timeout"); } // should be called from sub class @@ -73,12 +96,12 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable .handler(initializer) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.SO_REUSEADDR, true) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, RpcConstants.DEFAULT_CONNECT_TIMEOUT) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) connTimeoutMillis) .option(ChannelOption.SO_RCVBUF, 1048576 * 10) .option(ChannelOption.TCP_NODELAY, true); } - public RpcClientManager.RpcConnectionKey getKey() { + public RpcConnectionKey getKey() { return key; } @@ -132,7 +155,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable getHandler().registerCallback(rpcRequest.getId(), callback); } else { - if (!future.channel().isActive() && retry < maxRetries) { + if (!future.channel().isActive() && retry < maxRetryNum) { /* schedule the current request for the retry */ LOG.warn(future.cause() + " Try to reconnect :" + getKey().addr); @@ -173,6 +196,14 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable return this.channelFuture = bootstrap.clone().connect(address); } + private ConnectException makeConnectException(InetSocketAddress address, ChannelFuture future) { + if (future.cause() instanceof UnresolvedAddressException) { + return new ConnectException("Can't resolve host name: " + address.toString()); + } else { + return new ConnectTimeoutException(future.cause().getMessage()); + } + } + public synchronized void connect() throws ConnectException { if (isConnected()) return; @@ -186,10 +217,10 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable ChannelFuture f = doConnect(address).awaitUninterruptibly(); if (!f.isSuccess()) { - if (maxRetries > 0) { + if (maxRetryNum > 0) { doReconnect(address, f, ++retries); } else { - throw new ConnectException(ExceptionUtils.getMessage(f.cause())); + throw makeConnectException(address, f); } } } @@ -198,7 +229,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable throws ConnectException { for (; ; ) { - if (maxRetries > retries) { + if (maxRetryNum > retries) { retries++; if(getChannel().eventLoop().isShuttingDown()) { @@ -218,12 +249,7 @@ public abstract class NettyClientBase<T> implements ProtoDeclaration, Closeable } } else { LOG.error("Max retry count has been exceeded. attempts=" + retries + " caused by: " + future.cause()); - - if (future.cause() instanceof UnresolvedAddressException) { - throw new ConnectException("Can't resolve host name: " + address.toString()); - } else { - throw new ConnectTimeoutException(future.cause().getMessage()); - } + throw makeConnectException(address, future); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4a22d1da/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java index 8787dee..5d544cb 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/ProtoClientChannelInitializer.java @@ -34,26 +34,33 @@ import java.util.concurrent.TimeUnit; class ProtoClientChannelInitializer extends ChannelInitializer<Channel> { private final MessageLite defaultInstance; private final ChannelHandler handler; - private final long timeoutTimeNanos; - private final boolean enablePing; + private final long idleTimeout; + private final boolean hangDetection; + /** + * Channel Pipe Initializer + * + * @param handler Channel Handler + * @param defaultInstance Default Rpc Proto instance + * @param idleTimeout Idle timeout (milliseconds) + */ public ProtoClientChannelInitializer(ChannelHandler handler, MessageLite defaultInstance, - long timeoutTimeNanos, - boolean enablePing) { + long idleTimeout, boolean hangDetection) { this.handler = handler; this.defaultInstance = defaultInstance; - this.timeoutTimeNanos = timeoutTimeNanos; - this.enablePing = enablePing; + this.idleTimeout = idleTimeout; + this.hangDetection = hangDetection; } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast("idleStateHandler", - new IdleStateHandler(timeoutTimeNanos, timeoutTimeNanos / 2, 0, TimeUnit.NANOSECONDS)); - - if (enablePing) pipeline.addLast("MonitorClientHandler", new MonitorClientHandler()); + new IdleStateHandler(idleTimeout, idleTimeout / 2, 0, TimeUnit.MILLISECONDS)); + if (hangDetection) { + pipeline.addLast("MonitorClientHandler", new MonitorClientHandler()); + } pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder()); pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance)); pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()); http://git-wip-us.apache.org/repos/asf/tajo/blob/4a22d1da/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java index c801b8a..032cf35 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java @@ -33,15 +33,12 @@ import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.Properties; @ThreadSafe public class RpcClientManager { private static final Log LOG = LogFactory.getLog(RpcClientManager.class); - private volatile int timeoutSeconds = RpcConstants.DEFAULT_RPC_TIMEOUT_SECONDS; - private volatile int retries = RpcConstants.DEFAULT_RPC_RETRIES; - /* entries will be removed by ConnectionCloseFutureListener */ private static final Map<RpcConnectionKey, NettyClientBase> clients = Collections.synchronizedMap(new HashMap<RpcConnectionKey, NettyClientBase>()); @@ -61,26 +58,23 @@ public class RpcClientManager { } private <T extends NettyClientBase> T makeClient(RpcConnectionKey rpcConnectionKey, - int retries, - long timeout, - TimeUnit timeUnit, - boolean enablePing) + Properties rpcParams) throws NoSuchMethodException, ConnectException, ClassNotFoundException { - return makeClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, NettyUtils.getDefaultEventLoopGroup()); + + + return makeClient(NettyUtils.getDefaultEventLoopGroup(), rpcConnectionKey, rpcParams); } - private <T extends NettyClientBase> T makeClient(RpcConnectionKey rpcConnectionKey, - int retries, - long timeout, - TimeUnit timeUnit, - boolean enablePing, - EventLoopGroup eventLoopGroup) + private <T extends NettyClientBase> T makeClient(EventLoopGroup eventLoopGroup, + RpcConnectionKey rpcConnectionKey, + Properties rpcParams) throws NoSuchMethodException, ClassNotFoundException, ConnectException { NettyClientBase client; if (rpcConnectionKey.asyncMode) { - client = new AsyncRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, eventLoopGroup); + client = new AsyncRpcClient(eventLoopGroup, rpcConnectionKey, rpcParams); + } else { - client = new BlockingRpcClient(rpcConnectionKey, retries, timeout, timeUnit, enablePing, eventLoopGroup); + client = new BlockingRpcClient(eventLoopGroup, rpcConnectionKey, rpcParams); } return (T) client; } @@ -90,7 +84,9 @@ public class RpcClientManager { * This client will be shared per protocol and address. Client is removed in shared map when a client is closed */ public <T extends NettyClientBase> T getClient(InetSocketAddress addr, - Class<?> protocolClass, boolean asyncMode) + Class<?> protocolClass, + boolean asyncMode, + Properties rpcParams) throws NoSuchMethodException, ClassNotFoundException, ConnectException { RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode); @@ -98,7 +94,7 @@ public class RpcClientManager { synchronized (clients) { client = clients.get(key); if (client == null) { - clients.put(key, client = makeClient(key, retries, getTimeoutSeconds(), TimeUnit.SECONDS, true)); + clients.put(key, client = makeClient(key, rpcParams)); } } @@ -129,35 +125,30 @@ public class RpcClientManager { * Connect a {@link NettyClientBase} to the remote {@link NettyServerBase}, and returns rpc client by protocol. * This client does not managed. It should close. */ - public synchronized <T extends NettyClientBase> T newClient(InetSocketAddress addr, + public <T extends NettyClientBase> T newClient(InetSocketAddress addr, Class<?> protocolClass, boolean asyncMode, - int retries, - long timeout, - TimeUnit timeUnit, - boolean enablePing) - throws NoSuchMethodException, ClassNotFoundException, ConnectException { - - return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode), retries, timeout, timeUnit, enablePing); - } - - public synchronized <T extends NettyClientBase> T newClient(InetSocketAddress addr, - Class<?> protocolClass, - boolean asyncMode) + Properties rpcParams) throws NoSuchMethodException, ClassNotFoundException, ConnectException { - return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode), - retries, getTimeoutSeconds(), TimeUnit.SECONDS, true); + return newClient(new RpcConnectionKey(addr, protocolClass, asyncMode), rpcParams); } + /** + * + * @param key RpcConnectionKey + * @param <T> Rpc Protocol Class + * @return Rpc Client Class + * @throws NoSuchMethodException + * @throws ClassNotFoundException + * @throws ConnectException + */ public synchronized <T extends NettyClientBase> T newClient(RpcConnectionKey key, - int retries, - long timeout, - TimeUnit timeUnit, - boolean enablePing) + Properties connectionParameters) + throws NoSuchMethodException, ClassNotFoundException, ConnectException { - T client = makeClient(key, retries, timeout, timeUnit, enablePing); + T client = makeClient(key, connectionParameters); client.connect(); assert client.isConnected(); return client; @@ -165,12 +156,11 @@ public class RpcClientManager { public synchronized <T extends NettyClientBase> T newBlockingClient(InetSocketAddress addr, Class<?> protocolClass, - int retries, - EventLoopGroup eventLoopGroup) + EventLoopGroup eventLoopGroup, + Properties rpcParams) throws NoSuchMethodException, ClassNotFoundException, ConnectException { - T client = makeClient(new RpcConnectionKey(addr, protocolClass, false), - retries, 0, TimeUnit.SECONDS, false, eventLoopGroup); + T client = makeClient(eventLoopGroup, new RpcConnectionKey(addr, protocolClass, false), rpcParams); client.connect(); assert client.isConnected(); return client; @@ -220,61 +210,10 @@ public class RpcClientManager { } } - public int getTimeoutSeconds() { - return timeoutSeconds; - } - - public void setTimeoutSeconds(int timeoutSeconds) { - this.timeoutSeconds = timeoutSeconds; - } - - public int getRetries() { - return retries; - } - - public void setRetries(int retries) { - this.retries = retries; - } - - static class RpcConnectionKey { - final InetSocketAddress addr; - final Class<?> protocolClass; - final boolean asyncMode; - - final String description; - - public RpcConnectionKey(InetSocketAddress addr, - Class<?> protocolClass, boolean asyncMode) { - this.addr = addr; - this.protocolClass = protocolClass; - this.asyncMode = asyncMode; - this.description = "[" + protocolClass + "] " + addr + "," + asyncMode; - } - - @Override - public String toString() { - return description; - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof RpcConnectionKey)) { - return false; - } - - return toString().equals(obj.toString()); - } - - @Override - public int hashCode() { - return description.hashCode(); - } - } - static class ClientCloseFutureListener implements GenericFutureListener { - private RpcClientManager.RpcConnectionKey key; + private RpcConnectionKey key; - public ClientCloseFutureListener(RpcClientManager.RpcConnectionKey key) { + public ClientCloseFutureListener(RpcConnectionKey key) { this.key = key; } http://git-wip-us.apache.org/repos/asf/tajo/blob/4a22d1da/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionKey.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionKey.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionKey.java new file mode 100644 index 0000000..2804010 --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcConnectionKey.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import java.net.InetSocketAddress; + +public class RpcConnectionKey { + final InetSocketAddress addr; + final Class<?> protocolClass; + final boolean asyncMode; + + final String description; + + public RpcConnectionKey(InetSocketAddress addr, + Class<?> protocolClass, boolean asyncMode) { + this.addr = addr; + this.protocolClass = protocolClass; + this.asyncMode = asyncMode; + this.description = "[" + protocolClass + "] " + addr + "," + asyncMode; + } + + @Override + public String toString() { + return description; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof RpcConnectionKey)) { + return false; + } + + return toString().equals(obj.toString()); + } + + @Override + public int hashCode() { + return description.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/4a22d1da/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java index 8a3f385..6427ffe 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java @@ -42,6 +42,7 @@ import java.lang.annotation.Target; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -61,7 +62,7 @@ public class TestAsyncRpc { Interface stub; DummyProtocolAsyncImpl service; int retries; - RpcClientManager.RpcConnectionKey rpcConnectionKey; + RpcConnectionKey rpcConnectionKey; RpcClientManager manager = RpcClientManager.getInstance(); @Retention(RetentionPolicy.RUNTIME) @@ -129,10 +130,16 @@ public class TestAsyncRpc { public void setUpRpcClient() throws Exception { retries = 1; - rpcConnectionKey = new RpcClientManager.RpcConnectionKey( - RpcUtils.getConnectAddress(server.getListenAddress()), - DummyProtocol.class, true); - client = manager.newClient(rpcConnectionKey, retries, 10, TimeUnit.SECONDS, true); + rpcConnectionKey = new RpcConnectionKey( + RpcUtils.getConnectAddress(server.getListenAddress()), DummyProtocol.class, true); + + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries)); + connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(TimeUnit.SECONDS.toMillis(10))); + connParams.setProperty(RpcConstants.CLIENT_HANG_DETECTION, "true"); + + + client = manager.newClient(rpcConnectionKey, connParams); assertTrue(client.isConnected()); stub = client.getStub(); } @@ -347,10 +354,13 @@ public class TestAsyncRpc { }); serverThread.start(); - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true); - AsyncRpcClient client = manager.newClient(rpcConnectionKey, - retries, 0, TimeUnit.MILLISECONDS, false); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(address, DummyProtocol.class, true); + + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries)); + + AsyncRpcClient client = manager.newClient(rpcConnectionKey, connParams); assertTrue(client.isConnected()); Interface stub = client.getStub(); @@ -377,9 +387,13 @@ public class TestAsyncRpc { .setMessage(MESSAGE).build(); CallFuture<EchoMessage> future = new CallFuture<>(); - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true); - AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(address, DummyProtocol.class, true); + + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries)); + + AsyncRpcClient client = new AsyncRpcClient(NettyUtils.getDefaultEventLoopGroup(), rpcConnectionKey, connParams); try { client.connect(); fail(); @@ -409,9 +423,13 @@ public class TestAsyncRpc { boolean expected = false; AsyncRpcClient client = null; try { - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true); - client = new AsyncRpcClient(rpcConnectionKey, retries); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(address, DummyProtocol.class, true); + + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries)); + + client = new AsyncRpcClient(NettyUtils.getDefaultEventLoopGroup(), rpcConnectionKey, connParams); client.connect(); fail(); } catch (ConnectException e) { @@ -429,10 +447,14 @@ public class TestAsyncRpc { @SetupRpcConnection(setupRpcClient = false) public void testUnresolvedAddress2() throws Exception { String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey( + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey( RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, true); - AsyncRpcClient client = new AsyncRpcClient(rpcConnectionKey, retries); + + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries)); + + AsyncRpcClient client = new AsyncRpcClient(NettyUtils.getDefaultEventLoopGroup(), rpcConnectionKey, connParams); client.connect(); try { assertTrue(client.isConnected()); @@ -453,9 +475,12 @@ public class TestAsyncRpc { @Test(timeout = 60000) @SetupRpcConnection(setupRpcClient = false) public void testStubRecovery() throws Exception { - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); - AsyncRpcClient client = manager.newClient(rpcConnectionKey, 2, 0, TimeUnit.MILLISECONDS, false); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(2)); + AsyncRpcClient client = manager.newClient(rpcConnectionKey, connParams); EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); @@ -484,10 +509,15 @@ public class TestAsyncRpc { @Test(timeout = 60000) @SetupRpcConnection(setupRpcClient = false) public void testIdleTimeout() throws Exception { - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); - //500 millis idle timeout - AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + + // 500 millis idle timeout + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries)); + connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500)); + + AsyncRpcClient client = manager.newClient(rpcConnectionKey, connParams); assertTrue(client.isConnected()); Thread.sleep(600); //timeout @@ -504,11 +534,16 @@ public class TestAsyncRpc { @Test(timeout = 60000) @SetupRpcConnection(setupRpcClient = false) public void testPingOnIdle() throws Exception { - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); - //500 millis request timeout - AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true); + // 500 millis idle timeout + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries)); + connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500)); + connParams.setProperty(RpcConstants.CLIENT_HANG_DETECTION, "true"); + + AsyncRpcClient client = manager.newClient(rpcConnectionKey, connParams); assertTrue(client.isConnected()); Thread.sleep(600); @@ -522,10 +557,15 @@ public class TestAsyncRpc { @Test(timeout = 60000) @SetupRpcConnection(setupRpcClient = false) public void testIdleTimeoutWithActiveRequest() throws Exception { - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); - //500 millis idle timeout - AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + + // 500 millis idle timeout + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries)); + connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500)); + + AsyncRpcClient client = manager.newClient(rpcConnectionKey, connParams); assertTrue(client.isConnected()); Interface stub = client.getStub(); @@ -547,11 +587,16 @@ public class TestAsyncRpc { @Test(timeout = 60000) @SetupRpcConnection(setupRpcClient = false) public void testRequestTimeoutOnBusy() throws Exception { - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + + // 500 millis idle timeout + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(retries)); + connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500)); + connParams.setProperty(RpcConstants.CLIENT_HANG_DETECTION, "true"); - //500 millis request timeout - AsyncRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true); + AsyncRpcClient client = manager.newClient(rpcConnectionKey, connParams); assertTrue(client.isConnected()); Interface stub = client.getStub(); http://git-wip-us.apache.org/repos/asf/tajo/blob/4a22d1da/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java index 0fae7ee..0687d0b 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java @@ -39,6 +39,7 @@ import java.lang.annotation.Target; import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.ServerSocket; +import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -119,11 +120,16 @@ public class TestBlockingRpc { public void setUpRpcClient() throws Exception { retries = 1; - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey( + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, "1"); + connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(TimeUnit.SECONDS.toMillis(10))); + connParams.setProperty(RpcConstants.CLIENT_HANG_DETECTION, "true"); + + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey( RpcUtils.getConnectAddress(server.getListenAddress()), DummyProtocol.class, false); - client = manager.newClient(rpcConnectionKey, retries, 10, TimeUnit.SECONDS, true); + client = manager.newClient(rpcConnectionKey, connParams); assertTrue(client.isConnected()); stub = client.getStub(); } @@ -317,11 +323,13 @@ public class TestBlockingRpc { }); serverThread.start(); - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, false); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(address, DummyProtocol.class, false); + + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + ""); - BlockingRpcClient client = manager.newClient(rpcConnectionKey, - retries, 0, TimeUnit.MILLISECONDS, false); + BlockingRpcClient client = manager.newClient(rpcConnectionKey, connParams); assertTrue(client.isConnected()); BlockingInterface stub = client.getStub(); @@ -342,9 +350,14 @@ public class TestBlockingRpc { EchoMessage message = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, false); - BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(address, DummyProtocol.class, false); + + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + ""); + + BlockingRpcClient client = new BlockingRpcClient(NettyUtils.getDefaultEventLoopGroup(), rpcConnectionKey, + connParams); try { client.connect(); @@ -370,9 +383,13 @@ public class TestBlockingRpc { boolean expected = false; BlockingRpcClient client = null; try { - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(address, DummyProtocol.class, true); - client = new BlockingRpcClient(rpcConnectionKey, retries); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(address, DummyProtocol.class, true); + + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + ""); + + client = new BlockingRpcClient(NettyUtils.getDefaultEventLoopGroup(), rpcConnectionKey, connParams); client.connect(); fail(); } catch (ConnectException e) { @@ -388,11 +405,17 @@ public class TestBlockingRpc { @Test(timeout = 120000) @SetupRpcConnection(setupRpcClient = false) public void testUnresolvedAddress2() throws Exception { + String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey( + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey( RpcUtils.createUnresolved(hostAndPort), DummyProtocol.class, false); - BlockingRpcClient client = new BlockingRpcClient(rpcConnectionKey, retries); + + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + ""); + + BlockingRpcClient client = + new BlockingRpcClient(NettyUtils.getDefaultEventLoopGroup(), rpcConnectionKey, connParams); client.connect(); assertTrue(client.isConnected()); @@ -410,9 +433,11 @@ public class TestBlockingRpc { @Test(timeout = 60000) @SetupRpcConnection(setupRpcClient = false) public void testStubRecovery() throws Exception { - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); - BlockingRpcClient client = manager.newClient(rpcConnectionKey, 1, 0, TimeUnit.MILLISECONDS, false); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, String.valueOf(1)); + BlockingRpcClient client = manager.newClient(rpcConnectionKey, connParams); EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); @@ -440,10 +465,15 @@ public class TestBlockingRpc { @Test(timeout = 60000) @SetupRpcConnection(setupRpcClient = false) public void testIdleTimeout() throws Exception { - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); - //500 millis idle timeout - BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); + + // 500 millis socket timeout + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + ""); + connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500)); + + BlockingRpcClient client = manager.newClient(rpcConnectionKey, connParams); assertTrue(client.isConnected()); Thread.sleep(600); //timeout @@ -460,11 +490,16 @@ public class TestBlockingRpc { @Test(timeout = 60000) @SetupRpcConnection(setupRpcClient = false) public void testPingOnIdle() throws Exception { - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); + + // 500 millis socket timeout + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + ""); + connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500)); + connParams.setProperty(RpcConstants.CLIENT_HANG_DETECTION, "true"); - //500 millis request timeout - BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true); + BlockingRpcClient client = manager.newClient(rpcConnectionKey, connParams); assertTrue(client.isConnected()); Thread.sleep(600); @@ -478,10 +513,15 @@ public class TestBlockingRpc { @Test(timeout = 60000) @SetupRpcConnection(setupRpcClient = false) public void testIdleTimeoutWithActiveRequest() throws Exception { - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); - //500 millis idle timeout - BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, false); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); + + // 500 millis socket timeout + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + ""); + connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500)); + + BlockingRpcClient client = manager.newClient(rpcConnectionKey, connParams); assertTrue(client.isConnected()); BlockingInterface stub = client.getStub(); @@ -500,11 +540,16 @@ public class TestBlockingRpc { @Test(timeout = 60000) @SetupRpcConnection(setupRpcClient = false) public void testRequestTimeoutOnBusy() throws Exception { - RpcClientManager.RpcConnectionKey rpcConnectionKey = - new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); + RpcConnectionKey rpcConnectionKey = + new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, false); + + // 500 millis socket timeout + Properties connParams = new Properties(); + connParams.setProperty(RpcConstants.CLIENT_RETRY_NUM, retries + ""); + connParams.setProperty(RpcConstants.CLIENT_SOCKET_TIMEOUT, String.valueOf(500)); + connParams.setProperty(RpcConstants.CLIENT_HANG_DETECTION, "true"); - //500 millis request timeout - BlockingRpcClient client = manager.newClient(rpcConnectionKey, retries, 500, TimeUnit.MILLISECONDS, true); + BlockingRpcClient client = manager.newClient(rpcConnectionKey, connParams); assertTrue(client.isConnected()); BlockingInterface stub = client.getStub(); http://git-wip-us.apache.org/repos/asf/tajo/blob/4a22d1da/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java index 1053de6..160c6a3 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/test/java/org/apache/tajo/rpc/TestRpcClientManager.java @@ -25,10 +25,10 @@ import org.junit.Test; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import static org.junit.Assert.*; @@ -54,7 +54,7 @@ public class TestRpcClientManager { public void run() { NettyClientBase client = null; try { - client = manager.getClient(address, DummyProtocol.class, false); + client = manager.getClient(address, DummyProtocol.class, false, new Properties()); } catch (Throwable e) { fail(e.getMessage()); } @@ -68,7 +68,7 @@ public class TestRpcClientManager { future.get(); } - NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false); + NettyClientBase clientBase = manager.getClient(address, DummyProtocol.class, false, new Properties()); RpcClientManager.cleanup(clientBase); } finally { server.shutdown(); @@ -87,11 +87,11 @@ public class TestRpcClientManager { try { - NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true); + NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true, new Properties()); assertTrue(client.isConnected()); assertTrue(client.getChannel().isWritable()); - RpcClientManager.RpcConnectionKey key = client.getKey(); + RpcConnectionKey key = client.getKey(); assertTrue(RpcClientManager.contains(key)); client.close(); @@ -113,10 +113,10 @@ public class TestRpcClientManager { try { - NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true); + NettyClientBase client = manager.getClient(server.getListenAddress(), DummyProtocol.class, true, new Properties()); assertTrue(client.isConnected()); - RpcClientManager.RpcConnectionKey key = client.getKey(); + RpcConnectionKey key = client.getKey(); assertTrue(RpcClientManager.contains(key)); client.close(); @@ -144,17 +144,17 @@ public class TestRpcClientManager { NettyServerBase server = new AsyncRpcServer(DummyProtocol.class, service, new InetSocketAddress("127.0.0.1", 0), 3); server.start(); - RpcClientManager.RpcConnectionKey key = - new RpcClientManager.RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); + RpcConnectionKey key = + new RpcConnectionKey(server.getListenAddress(), DummyProtocol.class, true); RpcClientManager.close(); RpcClientManager manager = RpcClientManager.getInstance(); try { - NettyClientBase client1 = manager.newClient(key, 0, 0, TimeUnit.SECONDS, false); + NettyClientBase client1 = manager.newClient(key, new Properties()); assertTrue(client1.isConnected()); assertFalse(RpcClientManager.contains(key)); - NettyClientBase client2 = manager.newClient(key, 0, 0, TimeUnit.SECONDS, false); + NettyClientBase client2 = manager.newClient(key, new Properties()); assertTrue(client2.isConnected()); assertFalse(RpcClientManager.contains(key));
