DRILL-1595: Send intermediate fragments before leaf fragments
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/fdcedf97 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/fdcedf97 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/fdcedf97 Branch: refs/heads/master Commit: fdcedf972d30a5d9fc3e1bcaa364d9634235b82b Parents: 81e9325 Author: Steven Phillips <sphill...@maprtech.com> Authored: Mon Oct 27 23:29:18 2014 -0700 Committer: Steven Phillips <sphill...@maprtech.com> Committed: Tue Oct 28 16:40:55 2014 -0700 ---------------------------------------------------------------------- .../drill/exec/work/foreman/QueryManager.java | 45 ++++++++++++++++---- 1 file changed, 36 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/fdcedf97/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index 61f3d82..b60d8a9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -121,20 +122,37 @@ public class QueryManager implements FragmentStatusListener{ } } - Multimap<DrillbitEndpoint, PlanFragment> fragmentMap = ArrayListMultimap.create(); + Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create(); + Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create(); // record all fragments for status purposes. for (PlanFragment f : nonRootFragments) { logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson()); status.add(new FragmentData(f.getHandle(), f.getAssignment(), false)); - fragmentMap.put(f.getAssignment(), f); + if (f.getLeafFragment()) { + leafFragmentMap.put(f.getAssignment(), f); + } else { + intFragmentMap.put(f.getAssignment(), f); + } } + CountDownLatch latch = new CountDownLatch(intFragmentMap.keySet().size()); + + // send remote intermediate fragments + for (DrillbitEndpoint ep : intFragmentMap.keySet()) { + sendRemoteFragments(ep, intFragmentMap.get(ep), latch); + } + // wait for send complete + try { + latch.await(); + } catch (InterruptedException e) { + throw new ExecutionSetupException(e); + } // send remote (leaf) fragments. - for (DrillbitEndpoint ep : fragmentMap.keySet()) { - sendRemoteFragments(ep, fragmentMap.get(ep)); + for (DrillbitEndpoint ep : leafFragmentMap.keySet()) { + sendRemoteFragments(ep, leafFragmentMap.get(ep), null); } bee.getContext().getAllocator().resetFragmentLimits(); @@ -146,7 +164,7 @@ public class QueryManager implements FragmentStatusListener{ } } - private void sendRemoteFragments(DrillbitEndpoint assignment, Collection<PlanFragment> fragments){ + private void sendRemoteFragments(DrillbitEndpoint assignment, Collection<PlanFragment> fragments, CountDownLatch latch){ InitializeFragments.Builder fb = InitializeFragments.newBuilder(); for(PlanFragment f : fragments){ fb.addFragment(f); @@ -154,7 +172,7 @@ public class QueryManager implements FragmentStatusListener{ InitializeFragments initFrags = fb.build(); logger.debug("Sending remote fragments to node {} with data {}", assignment, initFrags); - FragmentSubmitListener listener = new FragmentSubmitListener(assignment, initFrags); + FragmentSubmitListener listener = new FragmentSubmitListener(assignment, initFrags, latch); controller.getTunnel(assignment).sendFragments(listener, initFrags); } @@ -269,16 +287,25 @@ public class QueryManager implements FragmentStatusListener{ } - public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value){ - return new FragmentSubmitListener(endpoint, value); + public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch){ + return new FragmentSubmitListener(endpoint, value, latch); } private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments>{ - public FragmentSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value) { + private CountDownLatch latch; + + public FragmentSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch) { super(endpoint, value); + this.latch = latch; } + @Override + public void success(Ack ack, ByteBuf byteBuf) { + if (latch != null) { + latch.countDown(); + } + } @Override public void failed(RpcException ex) {