DRILL-1070: Update work manager to capture errors in Foreman and Fragment 
threads rather than drop to JVM.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/bfb422ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/bfb422ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/bfb422ed

Branch: refs/heads/master
Commit: bfb422ed63597b99e875d13e50764335a29a8a11
Parents: 16808f4
Author: Jacques Nadeau <jacq...@apache.org>
Authored: Sat Jun 21 11:25:19 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Wed Jun 25 09:09:17 2014 -0700

----------------------------------------------------------------------
 .../org/apache/drill/exec/work/WorkManager.java | 138 +++++++++++--------
 1 file changed, 84 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bfb422ed/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index bddc1cf..7d5d37f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -25,7 +25,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.exec.cache.DistributedCache;
@@ -33,6 +33,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
 import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.NamedThreadFactory;
 import org.apache.drill.exec.rpc.control.Controller;
 import org.apache.drill.exec.rpc.control.WorkEventBus;
@@ -55,12 +56,13 @@ import com.codahale.metrics.MetricRegistry;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 
-public class WorkManager implements Closeable{
+public class WorkManager implements Closeable {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(WorkManager.class);
 
-  private Set<FragmentManager> incomingFragments = 
Collections.newSetFromMap(Maps.<FragmentManager, Boolean> newConcurrentMap());
+  private Set<FragmentManager> incomingFragments = 
Collections.newSetFromMap(Maps
+      .<FragmentManager, Boolean> newConcurrentMap());
 
-  private PriorityBlockingQueue<Runnable> pendingTasks = 
Queues.newPriorityBlockingQueue();
+  private LinkedBlockingQueue<RunnableWrapper> pendingTasks = 
Queues.newLinkedBlockingQueue();
 
   private Map<FragmentHandle, FragmentExecutor> runningFragments = 
Maps.newConcurrentMap();
 
@@ -79,7 +81,7 @@ public class WorkManager implements Closeable{
   private ExecutorService executor;
   private final EventThread eventThread;
 
-  public WorkManager(BootStrapContext context){
+  public WorkManager(BootStrapContext context) {
     this.bee = new WorkerBee();
     this.workBus = new WorkEventBus(bee);
     this.bContext = context;
@@ -89,28 +91,31 @@ public class WorkManager implements Closeable{
     this.dataHandler = new DataResponseHandlerImpl(bee);
   }
 
-  public void start(DrillbitEndpoint endpoint, DistributedCache cache, 
Controller controller, DataConnectionCreator data, ClusterCoordinator coord, 
PStoreProvider provider){
+  public void start(DrillbitEndpoint endpoint, DistributedCache cache, 
Controller controller,
+      DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider 
provider) {
     this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, 
data, cache, workBus, provider);
- //   executor = 
Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
+    // executor = 
Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
     executor = Executors.newCachedThreadPool(new 
NamedThreadFactory("WorkManager-"));
     eventThread.start();
-    
dContext.getMetrics().register(MetricRegistry.name("drill.exec.work.running_fragments."
 + dContext.getEndpoint().getUserPort()),
+    dContext.getMetrics().register(
+        MetricRegistry.name("drill.exec.work.running_fragments." + 
dContext.getEndpoint().getUserPort()),
         new Gauge<Integer>() {
-            @Override
-            public Integer getValue() {
-                return runningFragments.size();
-            }
+          @Override
+          public Integer getValue() {
+            return runningFragments.size();
+          }
         });
-    
dContext.getMetrics().register(MetricRegistry.name("drill.exec.work.pendingTasks"
 + dContext.getEndpoint().getUserPort()),
+    dContext.getMetrics().register(
+        MetricRegistry.name("drill.exec.work.pendingTasks" + 
dContext.getEndpoint().getUserPort()),
         new Gauge<Integer>() {
-            @Override
-            public Integer getValue() {
-                return pendingTasks.size();
-            }
+          @Override
+          public Integer getValue() {
+            return pendingTasks.size();
+          }
         });
   }
 
-  public WorkEventBus getWorkBus(){
+  public WorkEventBus getWorkBus() {
     return workBus;
   }
 
@@ -118,11 +123,11 @@ public class WorkManager implements Closeable{
     return dataHandler;
   }
 
-  public ControlMessageHandler getControlMessageHandler(){
+  public ControlMessageHandler getControlMessageHandler() {
     return controlMessageWorker;
   }
 
-  public UserWorker getUserWorker(){
+  public UserWorker getUserWorker() {
     return userWorker;
   }
 
@@ -141,43 +146,52 @@ public class WorkManager implements Closeable{
     }
   }
 
-
   public DrillbitContext getContext() {
     return dContext;
   }
 
+  private static String getId(FragmentHandle handle){
+    return "FragmentExecutor: " + 
QueryIdHelper.getQueryId(handle.getQueryId()) + ':' + 
handle.getMajorFragmentId() + ':' + handle.getMinorFragmentId();
+  }
+
   // create this so items can see the data here whether or not they are in 
this package.
-  public class WorkerBee{
+  public class WorkerBee {
+
+
 
-    public void addFragmentRunner(FragmentExecutor runner){
+    public void addFragmentRunner(FragmentExecutor runner) {
       logger.debug("Adding pending task {}", runner);
-      pendingTasks.add(runner);
+      RunnableWrapper wrapper = new RunnableWrapper(runner, 
getId(runner.getContext().getHandle()));
+      pendingTasks.add(wrapper);
     }
 
-    public void addNewForeman(Foreman foreman){
-      pendingTasks.add(foreman);
+    public void addNewForeman(Foreman foreman) {
+      String id = "Foreman: " + QueryIdHelper.getQueryId(foreman.getQueryId());
+      RunnableWrapper wrapper = new RunnableWrapper(foreman, id);
+      pendingTasks.add(wrapper);
       queries.put(foreman.getQueryId(), foreman);
     }
 
-
-    public void addFragmentPendingRemote(FragmentManager handler){
+    public void addFragmentPendingRemote(FragmentManager handler) {
       incomingFragments.add(handler);
     }
 
-    public void startFragmentPendingRemote(FragmentManager handler){
+    public void startFragmentPendingRemote(FragmentManager handler) {
       incomingFragments.remove(handler);
-      pendingTasks.add(handler.getRunnable());
+      FragmentExecutor runner = handler.getRunnable();
+      RunnableWrapper wrapper = new RunnableWrapper(runner, 
getId(runner.getContext().getHandle()));
+      pendingTasks.add(wrapper);
     }
 
-    public FragmentExecutor getFragmentRunner(FragmentHandle handle){
+    public FragmentExecutor getFragmentRunner(FragmentHandle handle) {
       return runningFragments.get(handle);
     }
 
-    public Foreman getForemanForQueryId(QueryId queryId){
+    public Foreman getForemanForQueryId(QueryId queryId) {
       return queries.get(queryId);
     }
 
-    public void retireForeman(Foreman foreman){
+    public void retireForeman(Foreman foreman) {
       queries.remove(foreman.getQueryId(), foreman);
     }
 
@@ -187,34 +201,50 @@ public class WorkManager implements Closeable{
 
   }
 
+  private class EventThread extends Thread {
+    public EventThread() {
+      this.setDaemon(true);
+      this.setName("WorkManager Event Thread");
+    }
 
-
- private class EventThread extends Thread{
-   public EventThread(){
-     this.setDaemon(true);
-     this.setName("WorkManager Event Thread");
-   }
-
-  @Override
-  public void run() {
-    try {
-    while(true){
-//      logger.debug("Polling for pending work tasks.");
-      Runnable r = pendingTasks.take();
-      if(r != null){
-        logger.debug("Starting pending task {}", r);
-        executor.execute(r);
+    @Override
+    public void run() {
+      try {
+        while (true) {
+          // logger.debug("Polling for pending work tasks.");
+          Runnable r = pendingTasks.take();
+          if (r != null) {
+            logger.debug("Starting pending task {}", r);
+            executor.execute(r);
+          }
+
+        }
+      } catch (InterruptedException e) {
+        logger.info("Work Manager stopping as it was interrupted.");
       }
-
-    }
-    } catch (InterruptedException e) {
-      logger.info("Work Manager stopping as it was interrupted.");
     }
+
   }
 
+  private class RunnableWrapper implements Runnable {
 
- }
+    private final Runnable inner;
+    private final String id;
 
+    public RunnableWrapper(Runnable r, String id){
+      this.inner = r;
+      this.id = id;
+    }
 
+    @Override
+    public void run() {
+      try{
+        inner.run();
+      }catch(Exception | Error e){
+        logger.error("Failure while running wrapper [{}]", id, e);
+      }
+    }
+
+  }
 
 }

Reply via email to