KurtYoung commented on a change in pull request #8499: [FLINK-12575]
[table-planner-blink] Introduce planner rules to remove redundant shuffle and
collation
URL: https://github.com/apache/flink/pull/8499#discussion_r287205763
##########
File path:
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
##########
@@ -242,6 +245,102 @@ class BatchExecOverAggregate(
windowGroupInfo
}
+ override def satisfyTraits(requiredTraitSet: RelTraitSet): RelNode = {
+ val requiredDistribution =
requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
+ val requiredCollation =
requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
+ if (requiredDistribution.getType == ANY &&
requiredCollation.getFieldCollations.isEmpty) {
+ return null
+ }
+
+ val selfProvidedTraitSet = inferProvidedTraitSet()
+ if (selfProvidedTraitSet.satisfies(requiredTraitSet)) {
+ // Current node can satisfy the requiredTraitSet,return the current node
with ProvidedTraitSet
+ return copy(selfProvidedTraitSet, Seq(getInput))
+ }
+
+ val inputFieldCnt = getInput.getRowType.getFieldCount
+ val canSatisfy = if (requiredDistribution.getType == ANY) {
+ true
+ } else {
+ if (!grouping.isEmpty) {
+ if (requiredDistribution.requireStrict) {
+ requiredDistribution.getKeys == ImmutableIntList.of(grouping: _*)
+ } else {
+ val isAllFieldsFromInput = requiredDistribution.getKeys.forall(_ <
inputFieldCnt)
+ if (isAllFieldsFromInput) {
+ val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
+ if (tableConfig.getConf.getBoolean(
+ PlannerConfigOptions.SQL_OPTIMIZER_SHUFFLE_PARTIAL_KEY_ENABLED))
{
+ ImmutableIntList.of(grouping:
_*).containsAll(requiredDistribution.getKeys)
+ } else {
+ requiredDistribution.getKeys == ImmutableIntList.of(grouping: _*)
+ }
+ } else {
+ // If requirement distribution keys are not all comes from input
directly,
+ // cannot satisfy requirement distribution and collations.
+ false
+ }
+ }
+ } else {
+ requiredDistribution.getType == SINGLETON
+ }
+ }
+ // If OverAggregate can provide distribution, but it's traits cannot
satisfy required
+ // distribution, cannot push down distribution and collation requirement
(because later
+ // shuffle will destroy previous collation.
+ if (!canSatisfy) {
+ return null
+ }
+
+ var inputRequiredTraits = getInput.getTraitSet
+ var providedTraits = selfProvidedTraitSet
+ val providedCollation =
selfProvidedTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
+ if (!requiredDistribution.isTop) {
+ inputRequiredTraits = inputRequiredTraits.replace(requiredDistribution)
+ providedTraits = providedTraits.replace(requiredDistribution)
+ }
+
+ if (providedCollation.satisfies(requiredCollation)) {
+ // the providedCollation can satisfy the requirement,
+ // so don't push down the sort into it's input.
+ } else if (providedCollation.getFieldCollations.isEmpty &&
+ requiredCollation.getFieldCollations.nonEmpty) {
+ // If OverAgg cannot provide collation itself, try to push down
collation requirements into
+ // it's input if collation fields all come from input node.
+ val canPushDownCollation = requiredCollation.getFieldCollations
+ .forall(_.getFieldIndex < inputFieldCnt)
+ if (canPushDownCollation) {
+ inputRequiredTraits = inputRequiredTraits.replace(requiredCollation)
+ providedTraits = providedTraits.replace(requiredCollation)
+ }
+ } else {
+ // Don't push down the sort into it's input,
+ // due to the provided collation will destroy the input's provided
collation.
+ }
+ val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
+ copy(providedTraits, Seq(newInput))
+ }
+
+ private def inferProvidedTraitSet(): RelTraitSet = {
+ var selfProvidedTraitSet = getTraitSet
+ // provided distribution
+ val providedDistribution = if (grouping.nonEmpty) {
+ FlinkRelDistribution.hash(grouping.map(Integer.valueOf).toList,
requireStrict = false)
Review comment:
requireStrict should be true?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services