vvysotskyi commented on a change in pull request #1907: DRILL-7450: Improve
performance for ANALYZE command
URL: https://github.com/apache/drill/pull/1907#discussion_r351216626
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MetadataAggPrule.java
##########
@@ -40,21 +52,182 @@ public MetadataAggPrule() {
@Override
public void onMatch(RelOptRuleCall call) {
- MetadataAggRel relNode = call.rel(0);
- RelNode input = relNode.getInput();
-
- int groupByExprsSize = relNode.getContext().groupByExpressions().size();
-
- // group by expressions will be returned first
- RelCollation collation = RelCollations.of(IntStream.range(1,
groupByExprsSize)
- .mapToObj(RelFieldCollation::new)
- .collect(Collectors.toList()));
-
- // TODO: update DrillDistributionTrait when implemented parallelization
for metadata collecting (see DRILL-7433)
- RelTraitSet traits =
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON);
- traits = groupByExprsSize > 0 ? traits.plus(collation) : traits;
- RelNode convertedInput = convert(input, traits);
- call.transformTo(
- new MetadataAggPrel(relNode.getCluster(), traits, convertedInput,
relNode.getContext()));
+ MetadataAggRel aggregate = call.rel(0);
+ RelNode input = aggregate.getInput();
+
+ int groupByExprsSize = aggregate.getContext().groupByExpressions().size();
+
+ List<RelFieldCollation> collations = new ArrayList<>();
+ List<String> names = new ArrayList<>();
+ for (int i = 0; i < groupByExprsSize; i++) {
+ collations.add(new RelFieldCollation(i + 1));
+ SchemaPath fieldPath =
getArgumentReference(aggregate.getContext().groupByExpressions().get(i));
+ names.add(fieldPath.getRootSegmentPath());
+ }
+
+ RelCollation collation = new NamedRelCollation(collations, names);
+
+ RelTraitSet traits;
+
+ try {
+ if (aggregate.getContext().groupByExpressions().isEmpty()) {
+ DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON;
+ RelTraitSet singleDistTrait =
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
+
+ createTransformRequest(call, aggregate, input, singleDistTrait);
+ } else {
+ // hash distribute on all grouping keys
+ DrillDistributionTrait distOnAllKeys =
+ new
DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+
ImmutableList.copyOf(getDistributionFields(aggregate.getContext().groupByExpressions())));
+
+ PlannerSettings settings =
PrelUtil.getPlannerSettings(call.getPlanner());
+ boolean smallInput =
+ input.estimateRowCount(input.getCluster().getMetadataQuery()) <
settings.getSliceTarget();
+
+ // force 2-phase aggregation for bottom aggregate call
+ // to produce sort locally before aggregation is produced for large
inputs
+ if (aggregate.getContext().createNewAggregations() && !smallInput) {
+ traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL);
+ RelNode convertedInput = convert(input, traits);
+
+ new TwoPhaseMetadataAggSubsetTransformer(call, collation,
distOnAllKeys)
+ .go(aggregate, convertedInput);
+ } else {
+ // TODO: DRILL-7433 - replace DrillDistributionTrait.SINGLETON with
distOnAllKeys when palatalization for MetadataHandler is implemented
+ traits =
call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(DrillDistributionTrait.SINGLETON);
+ createTransformRequest(call, aggregate, input, traits);
+ }
+ }
+ } catch (InvalidRelException e) {
+ throw new RuntimeException(e);
Review comment:
There is no need in try-catch block anymore, updated
`TwoPhaseMetadataAggSubsetTransformer` to remove the exception from the method
signature.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services