DRILL-384: Move some Fragment initialization work to run() method
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e0365158 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e0365158 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e0365158 Branch: refs/heads/master Commit: e03651583c77d494e972c5ceb2bb442b6bcd26b4 Parents: af7f917 Author: Steven Phillips <sphill...@maprtech.com> Authored: Wed May 14 23:36:38 2014 -0700 Committer: Steven Phillips <sphill...@maprtech.com> Committed: Thu May 15 15:13:11 2014 -0700 ---------------------------------------------------------------------- .../exec/work/batch/ControlHandlerImpl.java | 10 ++-------- .../drill/exec/work/foreman/QueryManager.java | 5 +---- .../exec/work/fragment/FragmentExecutor.java | 19 ++++++++++++++++--- .../work/fragment/NonRootFragmentManager.java | 12 ++++++------ 4 files changed, 25 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0365158/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java index e7accba..e69bf3a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java @@ -99,20 +99,14 @@ public class ControlHandlerImpl implements ControlMessageHandler { @Override public void startNewRemoteFragment(PlanFragment fragment) throws ExecutionSetupException{ logger.debug("Received remote fragment start instruction", fragment); - FragmentContext context = new FragmentContext(bee.getContext(), fragment, null, new FunctionImplementationRegistry(bee.getContext().getConfig())); + FragmentContext context = new FragmentContext(bee.getContext(), fragment, null, bee.getContext().getFunctionImplementationRegistry()); ControlTunnel tunnel = bee.getContext().getController().getTunnel(fragment.getForeman()); NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel); try{ FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson()); - RootExec exec = ImplCreator.getExec(context, rootOperator); - FragmentExecutor fr = new FragmentExecutor(context, exec, listener); + FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener); bee.addFragmentRunner(fr); - - }catch(IOException e){ - listener.fail(fragment.getHandle(), "Failure while parsing fragment execution plan.", e); - }catch(ExecutionSetupException e){ - listener.fail(fragment.getHandle(), "Failure while setting up execution plan.", e); } catch (Exception e) { listener.fail(fragment.getHandle(), "Failure due to uncaught exception", e); } catch (OutOfMemoryError t) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0365158/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index b305d0d..01b0df8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -89,13 +89,10 @@ public class QueryManager implements FragmentStatusListener{ IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext); logger.debug("Setting buffers on root context."); rootContext.setBuffers(buffers); - logger.debug("Generating Exec tree"); - RootExec rootExec = ImplCreator.getExec(rootContext, rootOperator); - logger.debug("Exec tree generated."); // add fragment to local node. map.put(rootFragment.getHandle(), new FragmentData(rootFragment.getHandle(), null, true)); logger.debug("Fragment added to local node."); - rootRunner = new FragmentExecutor(rootContext, rootExec, new RootStatusHandler(rootContext, rootFragment)); + rootRunner = new FragmentExecutor(rootContext, rootOperator, new RootStatusHandler(rootContext, rootFragment)); RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner); if(buffers.isDone()){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0365158/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 2323f52..7890fc9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -17,12 +17,17 @@ */ package org.apache.drill.exec.work.fragment; +import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.physical.impl.ImplCreator; import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.FragmentStatus.FragmentState; +import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; import org.apache.drill.exec.work.CancelableQuery; @@ -38,13 +43,14 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class); private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE); - private final RootExec root; + private final FragmentRoot rootOperator; + private RootExec root; private final FragmentContext context; private final StatusReporter listener; - public FragmentExecutor(FragmentContext context, RootExec root, StatusReporter listener){ + public FragmentExecutor(FragmentContext context, FragmentRoot rootOperator, StatusReporter listener){ this.context = context; - this.root = root; + this.rootOperator = rootOperator; this.listener = listener; } @@ -77,6 +83,13 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid Thread.currentThread().setName(newThreadName); boolean closed = false; + try { + root = ImplCreator.getExec(context, rootOperator); + } catch (ExecutionSetupException e) { + context.fail(e); + return; + } + logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId()); if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){ internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state. FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get())))); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0365158/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java index 5cad658..c7c3439 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java @@ -55,7 +55,7 @@ public class NonRootFragmentManager implements FragmentManager { try{ this.fragment = fragment; this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson()); - this.context = new FragmentContext(context, fragment, null, new FunctionImplementationRegistry(context.getConfig())); + this.context = new FragmentContext(context, fragment, null, context.getFunctionImplementationRegistry()); this.buffers = new IncomingBuffers(root, this.context); this.context.setBuffers(buffers); this.runnerListener = new NonRootStatusReporter(this.context, context.getController().getTunnel(fragment.getForeman())); @@ -82,15 +82,15 @@ public class NonRootFragmentManager implements FragmentManager { synchronized(this){ if(runner != null) throw new IllegalStateException("Get Runnable can only be run once."); if(cancel) return null; + FragmentRoot fragRoot = null; try { - FragmentRoot fragRoot = reader.readFragmentOperator(fragment.getFragmentJson()); - RootExec exec = ImplCreator.getExec(context, fragRoot); - runner = new FragmentExecutor(context, exec, runnerListener); - return this.runner; - } catch (IOException | ExecutionSetupException e) { + fragRoot = reader.readFragmentOperator(fragment.getFragmentJson()); + } catch (IOException e) { runnerListener.fail(fragment.getHandle(), "Failure while setting up remote fragment.", e); return null; } + runner = new FragmentExecutor(context, fragRoot, runnerListener); + return this.runner; } }