DRILL-172: Support DrillClient reconnect on failure.

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/3f101aab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/3f101aab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/3f101aab

Branch: refs/heads/master
Commit: 3f101aabd05f174e8bfc9dbdd2590066a1e937f4
Parents: 63a90fe
Author: witwolf <[email protected]>
Authored: Tue Sep 3 19:56:32 2013 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Tue Sep 3 20:11:50 2013 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/ExecConstants.java    |  3 +
 .../apache/drill/exec/client/DrillClient.java   | 84 +++++++++++++++-----
 .../org/apache/drill/exec/rpc/BasicClient.java  |  4 +
 .../drill/exec/rpc/CoordinationQueue.java       |  6 +-
 .../src/main/resources/drill-module.conf        |  7 +-
 .../src/test/resources/drill-module.conf        |  6 ++
 6 files changed, 90 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3f101aab/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 5e7ddf0..5ecadff 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -24,6 +24,9 @@ public interface ExecConstants {
   public static final String ZK_TIMEOUT = "drill.exec.zk.timeout";
   public static final String ZK_ROOT = "drill.exec.zk.root";
   public static final String ZK_REFRESH = "drill.exec.zk.refresh";
+  public static final String BIT_RETRY_TIMES = "drill.exec.bit.retry.count";
+  public static final String BIT_RETRY_DELAY = "drill.exec.bit.retry.delay";
+  public static final String BIT_TIMEOUT = "drill.exec.bit.timeout" ;
   public static final String STORAGE_ENGINE_SCAN_PACKAGES = 
"drill.exec.storage.packages";
   public static final String SERVICE_NAME = "drill.exec.cluster-id";
   public static final String INITIAL_BIT_PORT = "drill.exec.rpc.bit.port";

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3f101aab/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 7f28dff..de16f64 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -32,16 +32,15 @@ import java.util.List;
 import java.util.Vector;
 
 import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.coord.ZKClusterCoordinator;
 import org.apache.drill.exec.memory.DirectBufferAllocator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserProtos;
 import org.apache.drill.exec.proto.UserProtos.QueryType;
+import org.apache.drill.exec.rpc.*;
 import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
-import org.apache.drill.exec.rpc.DrillRpcFuture;
-import org.apache.drill.exec.rpc.NamedThreadFactory;
-import org.apache.drill.exec.rpc.RpcConnectionHandler;
-import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
 import org.apache.drill.exec.rpc.user.UserClient;
 import org.apache.drill.exec.rpc.user.UserResultsListener;
@@ -60,6 +59,8 @@ public class DrillClient implements Closeable{
   private volatile ClusterCoordinator clusterCoordinator;
   private volatile boolean connected = false;
   private final DirectBufferAllocator allocator = new DirectBufferAllocator();
+  private int reconnectTimes;
+  private int reconnectDelay;
   
   public DrillClient() {
     this(DrillConfig.create());
@@ -76,6 +77,8 @@ public class DrillClient implements Closeable{
   public DrillClient(DrillConfig config, ClusterCoordinator coordinator){
     this.config = config;
     this.clusterCoordinator = coordinator;
+    this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES);
+    this.reconnectDelay = config.getInt(ExecConstants.BIT_RETRY_DELAY);
   }
   
   public DrillConfig getConfig(){
@@ -88,36 +91,60 @@ public class DrillClient implements Closeable{
    * @throws IOException
    */
   public synchronized void connect() throws RpcException {
-    if(connected) return;
-    
-    if(clusterCoordinator == null){
+    if (connected) return;
+
+    if (clusterCoordinator == null) {
       try {
         this.clusterCoordinator = new ZKClusterCoordinator(this.config);
         this.clusterCoordinator.start(10000);
       } catch (Exception e) {
         throw new RpcException("Failure setting up ZK for client.", e);
       }
-      
     }
-    
+
     Collection<DrillbitEndpoint> endpoints = 
clusterCoordinator.getAvailableEndpoints();
     checkState(!endpoints.isEmpty(), "No DrillbitEndpoint can be found");
     // just use the first endpoint for now
     DrillbitEndpoint endpoint = endpoints.iterator().next();
     this.client = new UserClient(allocator.getUnderlyingAllocator(), new 
NioEventLoopGroup(1, new NamedThreadFactory("Client-")));
+    logger.debug("Connecting to server {}:{}", endpoint.getAddress(), 
endpoint.getUserPort());
+    connect(endpoint);
+    connected = true;
+  }
+
+
+  public synchronized boolean reconnect() {
+    if (client.isActive()) {
+      return true;
+    }
+    int retry = reconnectTimes;
+    while (retry > 0) {
+      retry--;
+      try {
+        Thread.sleep(this.reconnectDelay);
+        Collection<DrillbitEndpoint> endpoints = 
clusterCoordinator.getAvailableEndpoints();
+        if (endpoints.isEmpty()) {
+          continue;
+        }
+        client.close();
+        connect(endpoints.iterator().next());
+        return true;
+      } catch (Exception e) {
+      }
+    }
+    return false;
+  }
+
+  private void connect(DrillbitEndpoint endpoint) throws RpcException {
+    FutureHandler f = new FutureHandler();
     try {
-      logger.debug("Connecting to server {}:{}", endpoint.getAddress(), 
endpoint.getUserPort());
-      FutureHandler f = new FutureHandler();
-      this.client.connect(f, endpoint);
+      client.connect(f, endpoint);
       f.checkedGet();
-      connected = true;
     } catch (InterruptedException e) {
       throw new RpcException(e);
     }
   }
 
-  
-  
   public DirectBufferAllocator getAllocator() {
     return allocator;
   }
@@ -138,8 +165,9 @@ public class DrillClient implements Closeable{
    * @throws RpcException
    */
   public List<QueryResultBatch> runQuery(QueryType type, String plan) throws 
RpcException {
-    ListHoldingResultsListener listener = new ListHoldingResultsListener();
-    client.submitQuery(listener, 
newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
+    UserProtos.RunQuery query = 
newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build() ;
+    ListHoldingResultsListener listener = new 
ListHoldingResultsListener(query);
+    client.submitQuery(listener,query);
     return listener.getResults();
   }
   
@@ -157,9 +185,29 @@ public class DrillClient implements Closeable{
   private class ListHoldingResultsListener implements UserResultsListener {
     private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>();
     private SettableFuture<List<QueryResultBatch>> future = 
SettableFuture.create();
-    
+    private UserProtos.RunQuery query ;
+
+    public ListHoldingResultsListener(UserProtos.RunQuery query) {
+      this.query = query;
+    }
+
     @Override
     public void submissionFailed(RpcException ex) {
+      // or  !client.isActive()
+      if (ex instanceof ChannelClosedException) {
+        if (reconnect()) {
+          try {
+            client.submitQuery(this, query);
+          } catch (Exception e) {
+            fail(e);
+          }
+        } else {
+          fail(ex);
+        }
+      }
+    }
+
+    private void fail(Exception ex) {
       logger.debug("Submission failed.", ex);
       future.setException(ex);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3f101aab/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index b4ac993..23dc486 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -83,6 +83,10 @@ public abstract class BasicClient<T extends EnumLite, R 
extends RemoteConnection
     ;
   }
 
+  public boolean isActive(){
+    return connection.getChannel().isActive() ;
+  }
+
   protected abstract void validateHandshake(HANDSHAKE_RESPONSE 
validateHandshake) throws RpcException;
   protected abstract void finalizeConnection(HANDSHAKE_RESPONSE handshake, R 
connection);
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3f101aab/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
index 4b7d611..6b4351a 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/CoordinationQueue.java
@@ -81,7 +81,11 @@ public class CoordinationQueue {
       connection.releasePermit();
       if(!future.isSuccess()){
         removeFromMap(coordinationId);
-        future.get();
+        if(future.channel().isActive()) {
+           throw new RpcException("Future failed") ;
+        }else{
+          throw  new ChannelClosedException();
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3f101aab/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf 
b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
index c0735cc..c6e04ff 100644
--- a/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
+++ b/sandbox/prototype/exec/java-exec/src/main/resources/drill-module.conf
@@ -35,7 +35,12 @@ drill.exec: {
        }    
   },
   functions: ["org.apache.drill.expr.fn.impl"],
-  
+  bit: {
+    retry:{
+       count: 7200,
+       delay: 500
+    }
+  } ,  
   network: {
     start: 35000
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3f101aab/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf 
b/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
index 4829d34..744b786 100644
--- a/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
+++ b/sandbox/prototype/exec/java-exec/src/test/resources/drill-module.conf
@@ -29,6 +29,12 @@ drill.exec: {
          delay: 500
        }    
   }
+  bit: {
+    retry:{
+       count: 7200,
+       delay: 500
+    }
+  } ,
 
   network: {
     start: 35000

Reply via email to