DRILL-172: Support DrillClient reconnect on failure.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/3f101aab Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/3f101aab Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/3f101aab Branch: refs/heads/master Commit: 3f101aabd05f174e8bfc9dbdd2590066a1e937f4 Parents: 63a90fe Author: witwolf <[email protected]> Authored: Tue Sep 3 19:56:32 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Tue Sep 3 20:11:50 2013 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 3 + .../apache/drill/exec/client/DrillClient.java | 84 +++++++++++++++----- .../org/apache/drill/exec/rpc/BasicClient.java | 4 + .../drill/exec/rpc/CoordinationQueue.java | 6 +- .../src/main/resources/drill-module.conf | 7 +- .../src/test/resources/drill-module.conf | 6 ++ 6 files changed, 90 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3f101aab/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 5e7ddf0..5ecadff 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -24,6 +24,9 @@ public interface ExecConstants { public static final String ZK_TIMEOUT = "drill.exec.zk.timeout"; public static final String ZK_ROOT = "drill.exec.zk.root"; public static final String ZK_REFRESH = "drill.exec.zk.refresh"; + public static final String BIT_RETRY_TIMES = "drill.exec.bit.retry.count"; + public static final String BIT_RETRY_DELAY = "drill.exec.bit.retry.delay"; + public static final String BIT_TIMEOUT = "drill.exec.bit.timeout" ; public static final String STORAGE_ENGINE_SCAN_PACKAGES = "drill.exec.storage.packages"; public static final String SERVICE_NAME = "drill.exec.cluster-id"; public static final String INITIAL_BIT_PORT = "drill.exec.rpc.bit.port"; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3f101aab/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 7f28dff..de16f64 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -32,16 +32,15 @@ import java.util.List; import java.util.Vector; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.ZKClusterCoordinator; import org.apache.drill.exec.memory.DirectBufferAllocator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.proto.UserProtos.QueryType; +import org.apache.drill.exec.rpc.*; import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection; -import org.apache.drill.exec.rpc.DrillRpcFuture; -import org.apache.drill.exec.rpc.NamedThreadFactory; -import org.apache.drill.exec.rpc.RpcConnectionHandler; -import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.rpc.user.UserClient; import org.apache.drill.exec.rpc.user.UserResultsListener; @@ -60,6 +59,8 @@ public class DrillClient implements Closeable{ private volatile ClusterCoordinator clusterCoordinator; private volatile boolean connected = false; private final DirectBufferAllocator allocator = new DirectBufferAllocator(); + private int reconnectTimes; + private int reconnectDelay; public DrillClient() { this(DrillConfig.create()); @@ -76,6 +77,8 @@ public class DrillClient implements Closeable{ public DrillClient(DrillConfig config, ClusterCoordinator coordinator){ this.config = config; this.clusterCoordinator = coordinator; + this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES); + this.reconnectDelay = config.getInt(ExecConstants.BIT_RETRY_DELAY); } public DrillConfig getConfig(){ @@ -88,36 +91,60 @@ public class DrillClient implements Closeable{ * @throws IOException */ public synchronized void connect() throws RpcException { - if(connected) return; - - if(clusterCoordinator == null){ + if (connected) return; + + if (clusterCoordinator == null) { try { this.clusterCoordinator = new ZKClusterCoordinator(this.config); this.clusterCoordinator.start(10000); } catch (Exception e) { throw new RpcException("Failure setting up ZK for client.", e); } - } - + Collection<DrillbitEndpoint> endpoints = clusterCoordinator.getAvailableEndpoints(); checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found"); // just use the first endpoint for now DrillbitEndpoint endpoint = endpoints.iterator().next(); this.client = new UserClient(allocator.getUnderlyingAllocator(), new NioEventLoopGroup(1, new NamedThreadFactory("Client-"))); + logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort()); + connect(endpoint); + connected = true; + } + + + public synchronized boolean reconnect() { + if (client.isActive()) { + return true; + } + int retry = reconnectTimes; + while (retry > 0) { + retry--; + try { + Thread.sleep(this.reconnectDelay); + Collection<DrillbitEndpoint> endpoints = clusterCoordinator.getAvailableEndpoints(); + if (endpoints.isEmpty()) { + continue; + } + client.close(); + connect(endpoints.iterator().next()); + return true; + } catch (Exception e) { + } + } + return false; + } + + private void connect(DrillbitEndpoint endpoint) throws RpcException { + FutureHandler f = new FutureHandler(); try { - logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort()); - FutureHandler f = new FutureHandler(); - this.client.connect(f, endpoint); + client.connect(f, endpoint); f.checkedGet(); - connected = true; } catch (InterruptedException e) { throw new RpcException(e); } } - - public DirectBufferAllocator getAllocator() { return allocator; } @@ -138,8 +165,9 @@ public class DrillClient implements Closeable{ * @throws RpcException */ public List<QueryResultBatch> runQuery(QueryType type, String plan) throws RpcException { - ListHoldingResultsListener listener = new ListHoldingResultsListener(); - client.submitQuery(listener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build()); + UserProtos.RunQuery query = newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build() ; + ListHoldingResultsListener listener = new ListHoldingResultsListener(query); + client.submitQuery(listener,query); return listener.getResults(); } @@ -157,9 +185,29 @@ public class DrillClient implements Closeable{ private class ListHoldingResultsListener implements UserResultsListener { private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>(); private SettableFuture<List<QueryResultBatch>> future = SettableFuture.create(); - + private UserProtos.RunQuery query ; + + public ListHoldingResultsListener(UserProtos.RunQuery query) { + this.query = query; + } + @Override public void submissionFailed(RpcException ex) { + // or !client.isActive() + if (ex instanceof ChannelClosedException) { + if (reconnect()) { + try { + client.submitQuery(this, query); + } catch (Exception e) { + fail(e); + } + } else { + fail(ex); + } + } + } + + private void fail(Exception ex) { logger.debug("Submission failed.", ex); future.setException(ex); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3f101aab/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java index b4ac993..23dc486 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java @@ -83,6 +83,10 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection ; } + public boolean isActive(){ + return connection.getChannel().isActive() ; + } + protected abstract void validateHandshake(HANDSHAKE_RESPONSE validateHandshake) throws RpcException; protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R connection); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3f101aab/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java index 4b7d611..6b4351a 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java @@ -81,7 +81,11 @@ public class CoordinationQueue { connection.releasePermit(); if(!future.isSuccess()){ removeFromMap(coordinationId); - future.get(); + if(future.channel().isActive()) { + throw new RpcException("Future failed") ; + }else{ + throw new ChannelClosedException(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3f101aab/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf index c0735cc..c6e04ff 100644 --- a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf +++ b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf @@ -35,7 +35,12 @@ drill.exec: { } }, functions: ["org.apache.drill.expr.fn.impl"], - + bit: { + retry:{ + count: 7200, + delay: 500 + } + } , network: { start: 35000 } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3f101aab/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf b/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf index 4829d34..744b786 100644 --- a/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf +++ b/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf @@ -29,6 +29,12 @@ drill.exec: { delay: 500 } } + bit: { + retry:{ + count: 7200, + delay: 500 + } + } , network: { start: 35000
