DRILL-384: Move some Fragment initialization work to run() method

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e0365158
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e0365158
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e0365158

Branch: refs/heads/master
Commit: e03651583c77d494e972c5ceb2bb442b6bcd26b4
Parents: af7f917
Author: Steven Phillips <sphill...@maprtech.com>
Authored: Wed May 14 23:36:38 2014 -0700
Committer: Steven Phillips <sphill...@maprtech.com>
Committed: Thu May 15 15:13:11 2014 -0700

----------------------------------------------------------------------
 .../exec/work/batch/ControlHandlerImpl.java      | 10 ++--------
 .../drill/exec/work/foreman/QueryManager.java    |  5 +----
 .../exec/work/fragment/FragmentExecutor.java     | 19 ++++++++++++++++---
 .../work/fragment/NonRootFragmentManager.java    | 12 ++++++------
 4 files changed, 25 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0365158/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index e7accba..e69bf3a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -99,20 +99,14 @@ public class ControlHandlerImpl implements 
ControlMessageHandler {
   @Override
   public void startNewRemoteFragment(PlanFragment fragment) throws 
ExecutionSetupException{
     logger.debug("Received remote fragment start instruction", fragment);
-    FragmentContext context = new FragmentContext(bee.getContext(), fragment, 
null, new FunctionImplementationRegistry(bee.getContext().getConfig()));
+    FragmentContext context = new FragmentContext(bee.getContext(), fragment, 
null, bee.getContext().getFunctionImplementationRegistry());
     ControlTunnel tunnel = 
bee.getContext().getController().getTunnel(fragment.getForeman());
 
     NonRootStatusReporter listener = new NonRootStatusReporter(context, 
tunnel);
     try{
       FragmentRoot rootOperator = 
bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson());
-      RootExec exec = ImplCreator.getExec(context, rootOperator);
-      FragmentExecutor fr = new FragmentExecutor(context, exec, listener);
+      FragmentExecutor fr = new FragmentExecutor(context, rootOperator, 
listener);
       bee.addFragmentRunner(fr);
-
-    }catch(IOException e){
-      listener.fail(fragment.getHandle(), "Failure while parsing fragment 
execution plan.", e);
-    }catch(ExecutionSetupException e){
-      listener.fail(fragment.getHandle(), "Failure while setting up execution 
plan.", e);
     } catch (Exception e) {
       listener.fail(fragment.getHandle(), "Failure due to uncaught exception", 
e);
     } catch (OutOfMemoryError t) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0365158/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index b305d0d..01b0df8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -89,13 +89,10 @@ public class QueryManager implements FragmentStatusListener{
       IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
       logger.debug("Setting buffers on root context.");
       rootContext.setBuffers(buffers);
-      logger.debug("Generating Exec tree");
-      RootExec rootExec = ImplCreator.getExec(rootContext, rootOperator);
-      logger.debug("Exec tree generated.");
       // add fragment to local node.
       map.put(rootFragment.getHandle(), new 
FragmentData(rootFragment.getHandle(), null, true));
       logger.debug("Fragment added to local node.");
-      rootRunner = new FragmentExecutor(rootContext, rootExec, new 
RootStatusHandler(rootContext, rootFragment));
+      rootRunner = new FragmentExecutor(rootContext, rootOperator, new 
RootStatusHandler(rootContext, rootFragment));
       RootFragmentManager fragmentManager = new 
RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
       
       if(buffers.isDone()){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0365158/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 2323f52..7890fc9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -17,12 +17,17 @@
  */
 package org.apache.drill.exec.work.fragment;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.impl.ImplCreator;
 import org.apache.drill.exec.physical.impl.RootExec;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus;
 import org.apache.drill.exec.proto.BitControl.FragmentStatus.FragmentState;
+import org.apache.drill.exec.proto.BitControl.PlanFragment;
 import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
 import org.apache.drill.exec.work.CancelableQuery;
@@ -38,13 +43,14 @@ public class FragmentExecutor implements Runnable, 
CancelableQuery, StatusProvid
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
 
   private final AtomicInteger state = new 
AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
-  private final RootExec root;
+  private final FragmentRoot rootOperator;
+  private RootExec root;
   private final FragmentContext context;
   private final StatusReporter listener;
   
-  public FragmentExecutor(FragmentContext context, RootExec root, 
StatusReporter listener){
+  public FragmentExecutor(FragmentContext context, FragmentRoot rootOperator, 
StatusReporter listener){
     this.context = context;
-    this.root = root;
+    this.rootOperator = rootOperator;
     this.listener = listener;
   }
 
@@ -77,6 +83,13 @@ public class FragmentExecutor implements Runnable, 
CancelableQuery, StatusProvid
     Thread.currentThread().setName(newThreadName);
     
     boolean closed = false;
+    try {
+      root = ImplCreator.getExec(context, rootOperator);
+    } catch (ExecutionSetupException e) {
+      context.fail(e);
+      return;
+    }
+
     logger.debug("Starting fragment runner. {}:{}", 
context.getHandle().getMajorFragmentId(), 
context.getHandle().getMinorFragmentId());
     if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, 
false)){
       internalFail(new RuntimeException(String.format("Run was called when 
fragment was in %s state.  FragmentRunnables should only be started when they 
are currently in awaiting allocation state.", 
FragmentState.valueOf(state.get()))));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e0365158/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 5cad658..c7c3439 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -55,7 +55,7 @@ public class NonRootFragmentManager implements 
FragmentManager {
     try{
       this.fragment = fragment;
       this.root = 
context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
-      this.context = new FragmentContext(context, fragment, null, new 
FunctionImplementationRegistry(context.getConfig()));
+      this.context = new FragmentContext(context, fragment, null, 
context.getFunctionImplementationRegistry());
       this.buffers = new IncomingBuffers(root, this.context);
       this.context.setBuffers(buffers);
       this.runnerListener = new NonRootStatusReporter(this.context, 
context.getController().getTunnel(fragment.getForeman()));
@@ -82,15 +82,15 @@ public class NonRootFragmentManager implements 
FragmentManager {
     synchronized(this){
       if(runner != null) throw new IllegalStateException("Get Runnable can 
only be run once.");
       if(cancel) return null;
+      FragmentRoot fragRoot = null;
       try {
-        FragmentRoot fragRoot = 
reader.readFragmentOperator(fragment.getFragmentJson());
-        RootExec exec = ImplCreator.getExec(context, fragRoot);
-        runner = new FragmentExecutor(context, exec, runnerListener);
-        return this.runner;
-      } catch (IOException | ExecutionSetupException e) {
+        fragRoot = reader.readFragmentOperator(fragment.getFragmentJson());
+      } catch (IOException e) {
         runnerListener.fail(fragment.getHandle(), "Failure while setting up 
remote fragment.", e);
         return null;
       }
+      runner = new FragmentExecutor(context, fragRoot, runnerListener);
+      return this.runner;
     }
 
   }

Reply via email to