DRILL-1070: Update work manager to capture errors in Foreman and Fragment threads rather than drop to JVM.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/bfb422ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/bfb422ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/bfb422ed Branch: refs/heads/master Commit: bfb422ed63597b99e875d13e50764335a29a8a11 Parents: 16808f4 Author: Jacques Nadeau <jacq...@apache.org> Authored: Sat Jun 21 11:25:19 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Wed Jun 25 09:09:17 2014 -0700 ---------------------------------------------------------------------- .../org/apache/drill/exec/work/WorkManager.java | 138 +++++++++++-------- 1 file changed, 84 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/bfb422ed/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index bddc1cf..7d5d37f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -25,7 +25,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.drill.exec.cache.DistributedCache; @@ -33,6 +33,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.NamedThreadFactory; import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.rpc.control.WorkEventBus; @@ -55,12 +56,13 @@ import com.codahale.metrics.MetricRegistry; import com.google.common.collect.Maps; import com.google.common.collect.Queues; -public class WorkManager implements Closeable{ +public class WorkManager implements Closeable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkManager.class); - private Set<FragmentManager> incomingFragments = Collections.newSetFromMap(Maps.<FragmentManager, Boolean> newConcurrentMap()); + private Set<FragmentManager> incomingFragments = Collections.newSetFromMap(Maps + .<FragmentManager, Boolean> newConcurrentMap()); - private PriorityBlockingQueue<Runnable> pendingTasks = Queues.newPriorityBlockingQueue(); + private LinkedBlockingQueue<RunnableWrapper> pendingTasks = Queues.newLinkedBlockingQueue(); private Map<FragmentHandle, FragmentExecutor> runningFragments = Maps.newConcurrentMap(); @@ -79,7 +81,7 @@ public class WorkManager implements Closeable{ private ExecutorService executor; private final EventThread eventThread; - public WorkManager(BootStrapContext context){ + public WorkManager(BootStrapContext context) { this.bee = new WorkerBee(); this.workBus = new WorkEventBus(bee); this.bContext = context; @@ -89,28 +91,31 @@ public class WorkManager implements Closeable{ this.dataHandler = new DataResponseHandlerImpl(bee); } - public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider provider){ + public void start(DrillbitEndpoint endpoint, DistributedCache cache, Controller controller, + DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider provider) { this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, cache, workBus, provider); - // executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS) + // executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS) executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-")); eventThread.start(); - dContext.getMetrics().register(MetricRegistry.name("drill.exec.work.running_fragments." + dContext.getEndpoint().getUserPort()), + dContext.getMetrics().register( + MetricRegistry.name("drill.exec.work.running_fragments." + dContext.getEndpoint().getUserPort()), new Gauge<Integer>() { - @Override - public Integer getValue() { - return runningFragments.size(); - } + @Override + public Integer getValue() { + return runningFragments.size(); + } }); - dContext.getMetrics().register(MetricRegistry.name("drill.exec.work.pendingTasks" + dContext.getEndpoint().getUserPort()), + dContext.getMetrics().register( + MetricRegistry.name("drill.exec.work.pendingTasks" + dContext.getEndpoint().getUserPort()), new Gauge<Integer>() { - @Override - public Integer getValue() { - return pendingTasks.size(); - } + @Override + public Integer getValue() { + return pendingTasks.size(); + } }); } - public WorkEventBus getWorkBus(){ + public WorkEventBus getWorkBus() { return workBus; } @@ -118,11 +123,11 @@ public class WorkManager implements Closeable{ return dataHandler; } - public ControlMessageHandler getControlMessageHandler(){ + public ControlMessageHandler getControlMessageHandler() { return controlMessageWorker; } - public UserWorker getUserWorker(){ + public UserWorker getUserWorker() { return userWorker; } @@ -141,43 +146,52 @@ public class WorkManager implements Closeable{ } } - public DrillbitContext getContext() { return dContext; } + private static String getId(FragmentHandle handle){ + return "FragmentExecutor: " + QueryIdHelper.getQueryId(handle.getQueryId()) + ':' + handle.getMajorFragmentId() + ':' + handle.getMinorFragmentId(); + } + // create this so items can see the data here whether or not they are in this package. - public class WorkerBee{ + public class WorkerBee { + + - public void addFragmentRunner(FragmentExecutor runner){ + public void addFragmentRunner(FragmentExecutor runner) { logger.debug("Adding pending task {}", runner); - pendingTasks.add(runner); + RunnableWrapper wrapper = new RunnableWrapper(runner, getId(runner.getContext().getHandle())); + pendingTasks.add(wrapper); } - public void addNewForeman(Foreman foreman){ - pendingTasks.add(foreman); + public void addNewForeman(Foreman foreman) { + String id = "Foreman: " + QueryIdHelper.getQueryId(foreman.getQueryId()); + RunnableWrapper wrapper = new RunnableWrapper(foreman, id); + pendingTasks.add(wrapper); queries.put(foreman.getQueryId(), foreman); } - - public void addFragmentPendingRemote(FragmentManager handler){ + public void addFragmentPendingRemote(FragmentManager handler) { incomingFragments.add(handler); } - public void startFragmentPendingRemote(FragmentManager handler){ + public void startFragmentPendingRemote(FragmentManager handler) { incomingFragments.remove(handler); - pendingTasks.add(handler.getRunnable()); + FragmentExecutor runner = handler.getRunnable(); + RunnableWrapper wrapper = new RunnableWrapper(runner, getId(runner.getContext().getHandle())); + pendingTasks.add(wrapper); } - public FragmentExecutor getFragmentRunner(FragmentHandle handle){ + public FragmentExecutor getFragmentRunner(FragmentHandle handle) { return runningFragments.get(handle); } - public Foreman getForemanForQueryId(QueryId queryId){ + public Foreman getForemanForQueryId(QueryId queryId) { return queries.get(queryId); } - public void retireForeman(Foreman foreman){ + public void retireForeman(Foreman foreman) { queries.remove(foreman.getQueryId(), foreman); } @@ -187,34 +201,50 @@ public class WorkManager implements Closeable{ } + private class EventThread extends Thread { + public EventThread() { + this.setDaemon(true); + this.setName("WorkManager Event Thread"); + } - - private class EventThread extends Thread{ - public EventThread(){ - this.setDaemon(true); - this.setName("WorkManager Event Thread"); - } - - @Override - public void run() { - try { - while(true){ -// logger.debug("Polling for pending work tasks."); - Runnable r = pendingTasks.take(); - if(r != null){ - logger.debug("Starting pending task {}", r); - executor.execute(r); + @Override + public void run() { + try { + while (true) { + // logger.debug("Polling for pending work tasks."); + Runnable r = pendingTasks.take(); + if (r != null) { + logger.debug("Starting pending task {}", r); + executor.execute(r); + } + + } + } catch (InterruptedException e) { + logger.info("Work Manager stopping as it was interrupted."); } - - } - } catch (InterruptedException e) { - logger.info("Work Manager stopping as it was interrupted."); } + } + private class RunnableWrapper implements Runnable { - } + private final Runnable inner; + private final String id; + public RunnableWrapper(Runnable r, String id){ + this.inner = r; + this.id = id; + } + @Override + public void run() { + try{ + inner.run(); + }catch(Exception | Error e){ + logger.error("Failure while running wrapper [{}]", id, e); + } + } + + } }