HanumathRao commented on a change in pull request #1677: DRILL-7068: Support
memory adjustment framework for resource manageme…
URL: https://github.com/apache/drill/pull/1677#discussion_r263651213
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
##########
@@ -108,27 +111,106 @@ public double getAffinityFactor() {
return affinityFactor;
}
+ public Set<Wrapper> getRootFragments(PlanningSet planningSet) {
+ //The following code gets the root fragment by removing all the dependent
fragments on which root fragments depend upon.
+ //This is fine because the later parallelizer code traverses from these
root fragments to their respective dependent
+ //fragments.
+ final Set<Wrapper> roots = Sets.newHashSet();
+ for(Wrapper w : planningSet) {
+ roots.add(w);
+ }
+
+ //roots will be left over with the fragments which are not depended upon
by any other fragments.
+ for(Wrapper wrapper : planningSet) {
+ final List<Wrapper> fragmentDependencies =
wrapper.getFragmentDependencies();
+ if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
+ for(Wrapper dependency : fragmentDependencies) {
+ if (roots.contains(dependency)) {
+ roots.remove(dependency);
+ }
+ }
+ }
+ }
+
+ return roots;
+ }
+
+ public PlanningSet prepareFragmentTree(Fragment rootFragment) {
+ PlanningSet planningSet = new PlanningSet();
+
+ initFragmentWrappers(rootFragment, planningSet);
+
+ constructFragmentDependencyGraph(rootFragment, planningSet);
+
+ return planningSet;
+ }
+
+ /**
+ * Traverse all the major fragments and parallelize each major fragment
based on
+ * collected stats. The children fragments are parallelized before a parent
+ * fragment.
+ * @param planningSet Set of all major fragments and their context.
+ * @param roots Root nodes of the plan.
+ * @param activeEndpoints currently active drillbit endpoints.
+ * @throws PhysicalOperatorSetupException
+ */
+ public void collectStatsAndParallelizeFragments(PlanningSet planningSet,
Set<Wrapper> roots,
+ Collection<DrillbitEndpoint>
activeEndpoints) throws PhysicalOperatorSetupException {
+ for (Wrapper wrapper : roots) {
+ traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper
fragmentWrapper) -> {
+ // If this fragment is already parallelized then no need do it again.
+ // This happens in the case of fragments which have MUX operators.
+ if (fragmentWrapper.isEndpointsAssignmentDone()) {
+ return;
+ }
+ fragmentWrapper.getNode().getRoot().accept(new
StatsCollector(planningSet), fragmentWrapper);
+ fragmentWrapper.getStats()
+ .getDistributionAffinity()
+ .getFragmentParallelizer()
+ .parallelizeFragment(fragmentWrapper, this,
activeEndpoints);
+ //consolidate the cpu resources required by this major fragment per
drillbit.
+ fragmentWrapper.computeCpuResources();
+ }));
+ }
+ }
+
+ public abstract void adjustMemory(PlanningSet planningSet, Set<Wrapper>
roots,
+ Collection<DrillbitEndpoint>
activeEndpoints) throws PhysicalOperatorSetupException;
+
/**
- * Generate a set of assigned fragments based on the provided fragment tree.
Do not allow parallelization stages
- * to go beyond the global max width.
+ * The starting function for the whole parallelization and memory
computation logic.
+ * 1) Initially a fragment tree is prepared which contains a wrapper for
each fragment.
+ * The topology of this tree is same as that of the major fragment tree.
+ * 2) Traverse this fragment tree to collect stats for each major fragment
and then
+ * parallelize each fragment. At this stage minor fragments are not
created but all
+ * the required information to create minor fragment are computed.
+ * 3) Memory is computed for each operator and for the minor fragment.
+ * 4) Lastly all the above computed information is used to create the minor
fragments
+ * for each major fragment.
*
- * @param options Option list
- * @param foremanNode The driving/foreman node for this query. (this
node)
- * @param queryId The queryId for this query.
- * @param activeEndpoints The list of endpoints to consider for inclusion in
planning this query.
- * @param rootFragment The root node of the PhysicalPlan that we will be
parallelizing.
- * @param session UserSession of user who launched this query.
- * @param queryContextInfo Info related to the context when query has
started.
- * @return The list of generated PlanFragment protobuf objects to be
assigned out to the individual nodes.
+ * @param options List of options set by the user.
+ * @param foremanNode foreman node for this query plan.
+ * @param queryId Query ID.
+ * @param activeEndpoints currently active endpoins on which this plan will
run.
+ * @param rootFragment Root major fragment.
+ * @param session session context.
+ * @param queryContextInfo query context.
+ * @return
* @throws ExecutionSetupException
*/
- public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint
foremanNode, QueryId queryId,
- Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment,
- UserSession session, QueryContextInformation queryContextInfo) throws
ExecutionSetupException {
+ @Override
+ public final QueryWorkUnit generateWorkUnits(OptionList options,
DrillbitEndpoint foremanNode, QueryId queryId,
+ Collection<DrillbitEndpoint>
activeEndpoints, Fragment rootFragment,
+ UserSession session,
QueryContextInformation queryContextInfo) throws ExecutionSetupException {
+ PlanningSet planningSet = prepareFragmentTree(rootFragment);
+
+ Set<Wrapper> rootFragments = getRootFragments(planningSet);
+
+ collectStatsAndParallelizeFragments(planningSet, rootFragments,
activeEndpoints);
- final PlanningSet planningSet = getFragmentsHelper(activeEndpoints,
rootFragment);
- return generateWorkUnit(
- options, foremanNode, queryId, rootFragment, planningSet, session,
queryContextInfo);
+ adjustMemory(planningSet, rootFragments, activeEndpoints);
+
+ return generateWorkUnit(options, foremanNode, queryId, rootFragment,
planningSet, session, queryContextInfo);
Review comment:
Yes, you are right.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services