DRILL-1595: Send intermediate fragments before leaf fragments

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fdcedf97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fdcedf97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fdcedf97

Branch: refs/heads/master
Commit: fdcedf972d30a5d9fc3e1bcaa364d9634235b82b
Parents: 81e9325
Author: Steven Phillips <sphill...@maprtech.com>
Authored: Mon Oct 27 23:29:18 2014 -0700
Committer: Steven Phillips <sphill...@maprtech.com>
Committed: Tue Oct 28 16:40:55 2014 -0700

----------------------------------------------------------------------
 .../drill/exec/work/foreman/QueryManager.java   | 45 ++++++++++++++++----
 1 file changed, 36 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fdcedf97/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 61f3d82..b60d8a9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -121,20 +122,37 @@ public class QueryManager implements 
FragmentStatusListener{
       }
     }
 
-    Multimap<DrillbitEndpoint, PlanFragment> fragmentMap = 
ArrayListMultimap.create();
+    Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = 
ArrayListMultimap.create();
+    Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = 
ArrayListMultimap.create();
 
     // record all fragments for status purposes.
     for (PlanFragment f : nonRootFragments) {
       logger.debug("Tracking intermediate remote node {} with data {}", 
f.getAssignment(), f.getFragmentJson());
       status.add(new FragmentData(f.getHandle(), f.getAssignment(), false));
-      fragmentMap.put(f.getAssignment(), f);
+      if (f.getLeafFragment()) {
+        leafFragmentMap.put(f.getAssignment(), f);
+      } else {
+        intFragmentMap.put(f.getAssignment(), f);
+      }
     }
 
+    CountDownLatch latch = new CountDownLatch(intFragmentMap.keySet().size());
+
+    // send remote intermediate fragments
+    for (DrillbitEndpoint ep : intFragmentMap.keySet()) {
+      sendRemoteFragments(ep, intFragmentMap.get(ep), latch);
+    }
 
+    // wait for send complete
+    try {
+      latch.await();
+    } catch (InterruptedException e) {
+      throw new ExecutionSetupException(e);
+    }
 
     // send remote (leaf) fragments.
-    for (DrillbitEndpoint ep : fragmentMap.keySet()) {
-      sendRemoteFragments(ep, fragmentMap.get(ep));
+    for (DrillbitEndpoint ep : leafFragmentMap.keySet()) {
+      sendRemoteFragments(ep, leafFragmentMap.get(ep), null);
     }
 
     bee.getContext().getAllocator().resetFragmentLimits();
@@ -146,7 +164,7 @@ public class QueryManager implements FragmentStatusListener{
     }
   }
 
-  private void sendRemoteFragments(DrillbitEndpoint assignment, 
Collection<PlanFragment> fragments){
+  private void sendRemoteFragments(DrillbitEndpoint assignment, 
Collection<PlanFragment> fragments, CountDownLatch latch){
     InitializeFragments.Builder fb = InitializeFragments.newBuilder();
     for(PlanFragment f : fragments){
       fb.addFragment(f);
@@ -154,7 +172,7 @@ public class QueryManager implements FragmentStatusListener{
     InitializeFragments initFrags = fb.build();
 
     logger.debug("Sending remote fragments to node {} with data {}", 
assignment, initFrags);
-    FragmentSubmitListener listener = new FragmentSubmitListener(assignment, 
initFrags);
+    FragmentSubmitListener listener = new FragmentSubmitListener(assignment, 
initFrags, latch);
     controller.getTunnel(assignment).sendFragments(listener, initFrags);
   }
 
@@ -269,16 +287,25 @@ public class QueryManager implements 
FragmentStatusListener{
 
   }
 
-  public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, 
InitializeFragments value){
-    return new FragmentSubmitListener(endpoint, value);
+  public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, 
InitializeFragments value, CountDownLatch latch){
+    return new FragmentSubmitListener(endpoint, value, latch);
   }
 
   private class FragmentSubmitListener extends EndpointListener<Ack, 
InitializeFragments>{
 
-    public FragmentSubmitListener(DrillbitEndpoint endpoint, 
InitializeFragments value) {
+    private CountDownLatch latch;
+
+    public FragmentSubmitListener(DrillbitEndpoint endpoint, 
InitializeFragments value, CountDownLatch latch) {
       super(endpoint, value);
+      this.latch = latch;
     }
 
+    @Override
+    public void success(Ack ack, ByteBuf byteBuf) {
+      if (latch != null) {
+        latch.countDown();
+      }
+    }
 
     @Override
     public void failed(RpcException ex) {

Reply via email to