DRILL-855: Improve work assignment parallelization
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/d929faac Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/d929faac Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/d929faac Branch: refs/heads/master Commit: d929faace270302c5e00e272775be98d5a8f83e1 Parents: 292765e Author: Aditya Kishore <[email protected]> Authored: Wed May 28 04:56:46 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon Jun 2 09:14:42 2014 -0700 ---------------------------------------------------------------------- distribution/src/resources/drill-override.conf | 1 + .../org/apache/drill/exec/ExecConstants.java | 1 + .../drill/exec/physical/EndpointAffinity.java | 20 ++++-- .../drill/exec/planner/SimpleExecPlanner.java | 8 +-- .../planner/fragment/SimpleParallelizer.java | 47 ++++++++++---- .../drill/exec/planner/fragment/Wrapper.java | 68 +++++++++++--------- .../apache/drill/exec/work/foreman/Foreman.java | 29 ++------- .../src/main/resources/drill-module.conf | 16 +++++ .../drill/exec/pop/TestFragmentChecker.java | 4 +- 9 files changed, 116 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/distribution/src/resources/drill-override.conf ---------------------------------------------------------------------- diff --git a/distribution/src/resources/drill-override.conf b/distribution/src/resources/drill-override.conf index a9316a9..da3d094 100644 --- a/distribution/src/resources/drill-override.conf +++ b/distribution/src/resources/drill-override.conf @@ -87,6 +87,7 @@ drill.exec: { work: { max.width.per.endpoint: 5, global.max.width: 100, + affinity.factor: 1.2, executor.threads: 4 }, trace: { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index d9e0833..e66e93c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -44,6 +44,7 @@ public interface ExecConstants { public static final String GLOBAL_MAX_WIDTH = "drill.exec.work.global.max.width"; public static final String MAX_WIDTH_PER_ENDPOINT = "drill.exec.work.max.width.per.endpoint"; public static final String EXECUTOR_THREADS = "drill.exec.work.executor.threads"; + public static final String AFFINITY_FACTOR = "drill.exec.work.affinity.factor"; public static final String CLIENT_RPC_THREADS = "drill.exec.rpc.user.client.threads"; public static final String BIT_SERVER_RPC_THREADS = "drill.exec.rpc.bit.server.threads"; public static final String USER_SERVER_RPC_THREADS = "drill.exec.rpc.user.server.threads"; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java index f3059ae..df31f74 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/EndpointAffinity.java @@ -19,18 +19,20 @@ package org.apache.drill.exec.physical; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import com.google.protobuf.TextFormat; + public class EndpointAffinity implements Comparable<EndpointAffinity>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EndpointAffinity.class); - + private DrillbitEndpoint endpoint; private float affinity = 0.0f; - + public EndpointAffinity(DrillbitEndpoint endpoint) { super(); this.endpoint = endpoint; } - + public EndpointAffinity(DrillbitEndpoint endpoint, float affinity) { super(); this.endpoint = endpoint; @@ -46,15 +48,19 @@ public class EndpointAffinity implements Comparable<EndpointAffinity>{ public float getAffinity() { return affinity; } - + @Override public int compareTo(EndpointAffinity o) { return Float.compare(affinity, o.affinity); } - + public void addAffinity(float f){ affinity += f; } - - + + @Override + public String toString() { + return "EndpointAffinity [endpoint=" + TextFormat.shortDebugString(endpoint) + ", affinity=" + affinity + "]"; + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java index 4da6500..132fde7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimpleExecPlanner.java @@ -51,9 +51,9 @@ public class SimpleExecPlanner implements ExecPlanner{ int maxWidthPerEndpoint = context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT); - return parallelizer.getFragments(new OptionList(), context.getCurrentEndpoint(), context.getQueryId(), context.getActiveEndpoints(), - context.getPlanReader(), fragmentRoot, planningSet, maxWidth, maxWidthPerEndpoint); - - + parallelizer.setGlobalMaxWidth(maxWidth).setMaxWidthPerEndpoint(maxWidthPerEndpoint); + return parallelizer.getFragments(new OptionList(), context.getCurrentEndpoint(), + context.getQueryId(), context.getActiveEndpoints(), context.getPlanReader(), fragmentRoot, planningSet); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index 313a81d..d226b08 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -51,6 +51,36 @@ public class SimpleParallelizer { private final Materializer materializer = new Materializer(); /** + * The maximum level or parallelization any stage of the query can do. Note that while this + * might be the number of active Drillbits, realistically, this could be well beyond that + * number of we want to do things like speed results return. + */ + private int globalMaxWidth; + public SimpleParallelizer setGlobalMaxWidth(int globalMaxWidth) { + this.globalMaxWidth = globalMaxWidth; + return this; + } + + /** + * Limits the maximum level of parallelization to this factor time the number of Drillbits + */ + private int maxWidthPerEndpoint; + public SimpleParallelizer setMaxWidthPerEndpoint(int maxWidthPerEndpoint) { + this.maxWidthPerEndpoint = maxWidthPerEndpoint; + return this; + } + + + /** + * Factor by which a node with endpoint affinity will be favored while creating assignment + */ + private double affinityFactor = 1.2f; + public SimpleParallelizer setAffinityFactor(double affinityFactor) { + this.affinityFactor = affinityFactor; + return this; + } + + /** * Generate a set of assigned fragments based on the provided planningSet. Do not allow parallelization stages to go * beyond the global max width. * @@ -60,16 +90,12 @@ public class SimpleParallelizer { * @param reader Tool used to read JSON plans * @param rootNode The root node of the PhysicalPlan that we will parallelizing. * @param planningSet The set of queries with collected statistics that we'll work with. - * @param globalMaxWidth The maximum level or parallelization any stage of the query can do. Note that while this - * might be the number of active Drillbits, realistically, this could be well beyond that - * number of we want to do things like speed results return. - * @param maxWidthPerEndpoint Limits the maximum level of parallelization to this factor time the number of Drillbits * @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes. * @throws ExecutionSetupException */ - public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet, - int globalMaxWidth, int maxWidthPerEndpoint) throws ExecutionSetupException { - assignEndpoints(activeEndpoints, planningSet, globalMaxWidth, maxWidthPerEndpoint); + public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, Collection<DrillbitEndpoint> activeEndpoints, + PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet) throws ExecutionSetupException { + assignEndpoints(activeEndpoints, planningSet); return generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet); } @@ -152,11 +178,9 @@ public class SimpleParallelizer { } return new QueryWorkUnit(rootOperator, rootFragment, fragments); - } - private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, PlanningSet planningSet, - int globalMaxWidth, int maxWidthPerEndpoint) throws PhysicalOperatorSetupException { + private void assignEndpoints(Collection<DrillbitEndpoint> allNodes, PlanningSet planningSet) throws PhysicalOperatorSetupException { // First we determine the amount of parallelization for a fragment. This will be between 1 and maxWidth based on // cost. (Later could also be based on cluster operation.) then we decide endpoints based on affinity (later this // could be based on endpoint load) @@ -181,7 +205,8 @@ public class SimpleParallelizer { // logger.debug("Setting width {} on fragment {}", width, wrapper); wrapper.setWidth(width); // figure out endpoint assignments. also informs the exchanges about their respective endpoints. - wrapper.assignEndpoints(allNodes); + wrapper.assignEndpoints(allNodes, affinityFactor); } } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java index 8602bf0..38cba09 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java @@ -19,20 +19,24 @@ package org.apache.drill.exec.planner.fragment; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; -import org.apache.commons.lang.NotImplementedException; import org.apache.drill.common.exceptions.PhysicalOperatorSetupException; -import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.physical.EndpointAffinity; -import org.apache.drill.exec.physical.base.*; +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.Exchange; import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.Store; +import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -47,7 +51,7 @@ public class Wrapper { private int width = -1; private final Stats stats; private boolean endpointsAssigned; - private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinity = Maps.newHashMap(); + private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinityMap = Maps.newHashMap(); private long initialAllocation = 0; private long maxAllocation = 0; @@ -71,14 +75,14 @@ public class Wrapper { addEndpointAffinity(ea.getEndpoint(), ea.getAffinity()); } } - + public void addEndpointAffinity(DrillbitEndpoint endpoint, float affinity) { Preconditions.checkState(!endpointsAssigned); Preconditions.checkNotNull(endpoint); - EndpointAffinity ea = endpointAffinity.get(endpoint); + EndpointAffinity ea = endpointAffinityMap.get(endpoint); if (ea == null) { ea = new EndpointAffinity(endpoint); - endpointAffinity.put(endpoint, ea); + endpointAffinityMap.put(endpoint, ea); } ea.addAffinity(affinity); @@ -116,7 +120,6 @@ public class Wrapper { private class AssignEndpointsToScanAndStore extends AbstractPhysicalVisitor<Void, List<DrillbitEndpoint>, PhysicalOperatorSetupException>{ - @Override public Void visitExchange(Exchange exchange, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException { if(exchange == node.getSendingExchange()){ @@ -148,40 +151,42 @@ public class Wrapper { public Void visitOp(PhysicalOperator op, List<DrillbitEndpoint> value) throws PhysicalOperatorSetupException { return visitChildren(op, value); } - + } - - public void assignEndpoints(Collection<DrillbitEndpoint> allPossible) throws PhysicalOperatorSetupException { - Preconditions.checkState(!endpointsAssigned); + public void assignEndpoints(Collection<DrillbitEndpoint> allEndpoints, double affinityFactor) throws PhysicalOperatorSetupException { + Preconditions.checkState(!endpointsAssigned); endpointsAssigned = true; - List<EndpointAffinity> values = Lists.newArrayList(); - values.addAll(endpointAffinity.values()); - - if (values.size() == 0) { - List<DrillbitEndpoint> all = Lists.newArrayList(allPossible); - final int div = allPossible.size(); - int start = ThreadLocalRandom.current().nextInt(div); - // round robin with random start. - for (int i = start; i < start + width; i++) { - Preconditions.checkNotNull(all.get(i % div)); - endpoints.add(all.get(i % div)); - } - } else { + if (endpointAffinityMap.size() > 0) { + List<EndpointAffinity> affinedEPs = Lists.newArrayList(endpointAffinityMap.values()); // get nodes with highest affinity. - Collections.sort(values); - values = Lists.reverse(values); - for (int i = 0; i < width; i++) { - Preconditions.checkNotNull(values.get(i%values.size()).getEndpoint()); - endpoints.add(values.get(i%values.size()).getEndpoint()); + Collections.sort(affinedEPs); + Iterator<EndpointAffinity> affinedEPItr = Iterators.cycle(Lists.reverse(affinedEPs)); + /** Maximum number of slots which should go to endpoints with affinity */ + int affinedSlots = Math.min((Math.max(1, (int) (affinityFactor*width/allEndpoints.size())) * affinedEPs.size()), width); + while(endpoints.size() < affinedSlots) { + EndpointAffinity ea = affinedEPItr.next(); + DrillbitEndpoint endpoint = ea.getEndpoint(); + endpoints.add(endpoint); + } + } + // add other endpoints if required + if (endpoints.size() < width) { + List<DrillbitEndpoint> all = Lists.newArrayList(allEndpoints); + all.removeAll(endpointAffinityMap.keySet()); + // round robin with random start. + Collections.shuffle(all, ThreadLocalRandom.current()); + Iterator<DrillbitEndpoint> otherEPItr = Iterators.cycle(all.size() > 0 ? all : endpointAffinityMap.keySet()); + while (endpoints.size() < width) { + endpoints.add(otherEPItr.next()); } } // Set scan and store endpoints. AssignEndpointsToScanAndStore visitor = new AssignEndpointsToScanAndStore(); node.getRoot().accept(visitor, endpoints); - + // Set the endpoints for this (one at most) sending exchange. if (node.getSendingExchange() != null) { node.getSendingExchange().setupSenders(majorFragmentId, endpoints); @@ -202,4 +207,5 @@ public class Wrapper { Preconditions.checkState(endpointsAssigned); return this.endpoints.get(minorFragmentId); } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index eb1d738..b8edb84 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -121,7 +121,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ cleanupAndSendResult(result); } - public void cancel() { if(isFinished()){ return; @@ -148,8 +147,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } } - - /** * Called by execution pool to do foreman setup. Actual query execution is a separate phase (and can be scheduled). */ @@ -160,10 +157,8 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ // convert a run query request into action try{ switch (queryRequest.getType()) { - case LOGICAL: parseAndRunLogicalPlan(queryRequest.getPlan()); - break; case PHYSICAL: parseAndRunPhysicalPlan(queryRequest.getPlan()); @@ -185,7 +180,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } } - private void parseAndRunLogicalPlan(String json) { try { @@ -249,8 +243,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } - - private void parseAndRunPhysicalPlan(String json) { try { PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json); @@ -260,7 +252,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } } - private void runPhysicalPlan(PhysicalPlan plan) { if(plan.getProperties().resultMode != ResultMode.EXEC){ @@ -278,12 +269,14 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } PlanningSet planningSet = StatsCollector.collectStats(rootFragment); - SimpleParallelizer parallelizer = new SimpleParallelizer(); + SimpleParallelizer parallelizer = new SimpleParallelizer() + .setGlobalMaxWidth(context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH)) + .setMaxWidthPerEndpoint(context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT)) + .setAffinityFactor(context.getConfig().getDouble(ExecConstants.AFFINITY_FACTOR)); try { - QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getSessionOptionList(), context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(), - context.getPlanReader(), rootFragment, planningSet, context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH), - context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT)); + QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getSessionOptionList(), context.getCurrentEndpoint(), + queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet); this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), fragmentManager); List<PlanFragment> leafFragments = Lists.newArrayList(); @@ -292,17 +285,13 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ // store fragments in distributed grid. logger.debug("Storing fragments"); for (PlanFragment f : work.getFragments()) { - // store all fragments in grid since they are part of handshake. - context.getCache().storeFragment(f); if (f.getLeafFragment()) { leafFragments.add(f); } else { intermediateFragments.add(f); } - - } logger.debug("Fragments stored."); @@ -311,7 +300,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ fragmentManager.runFragments(bee, work.getRootFragment(), work.getRootOperator(), initiatingClient, leafFragments, intermediateFragments); logger.debug("Fragments running."); - } catch (ExecutionSetupException | RpcException e) { fail("Failure while setting up query.", e); } @@ -352,7 +340,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ return this.state.getState(); } - class ForemanManagerListener{ void fail(String message, Throwable t) { ForemanManagerListener.this.fail(message, t); @@ -364,13 +351,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } - - @Override public int compareTo(Object o) { return o.hashCode() - o.hashCode(); } - - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/main/resources/drill-module.conf ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 26205bd..f8396bb 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -1,3 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + // This file tells Drill to consider this module when class path scanning. // This file can also include any supplementary configuration information. // This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. @@ -76,6 +91,7 @@ drill.exec: { work: { max.width.per.endpoint: 5, global.max.width: 100, + affinity.factor: 1.2, executor.threads: 4 }, trace: { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/d929faac/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java index 1b38dce..ea19351 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java @@ -59,8 +59,8 @@ public class TestFragmentChecker extends PopUnitTestBase{ endpoints.add(b1); } - - QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet, 10, 5); + par.setGlobalMaxWidth(10).setMaxWidthPerEndpoint(5); + QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet); System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId())); System.out.print(qwu.getRootFragment().getFragmentJson());
