KurtYoung commented on a change in pull request #8499: [FLINK-12575] 
[table-planner-blink] Introduce planner rules to remove redundant shuffle and 
collation
URL: https://github.com/apache/flink/pull/8499#discussion_r287205763
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecOverAggregate.scala
 ##########
 @@ -242,6 +245,102 @@ class BatchExecOverAggregate(
     windowGroupInfo
   }
 
+  override def satisfyTraits(requiredTraitSet: RelTraitSet): RelNode = {
+    val requiredDistribution = 
requiredTraitSet.getTrait(FlinkRelDistributionTraitDef.INSTANCE)
+    val requiredCollation = 
requiredTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
+    if (requiredDistribution.getType == ANY && 
requiredCollation.getFieldCollations.isEmpty) {
+      return null
+    }
+
+    val selfProvidedTraitSet = inferProvidedTraitSet()
+    if (selfProvidedTraitSet.satisfies(requiredTraitSet)) {
+      // Current node can satisfy the requiredTraitSet,return the current node 
with ProvidedTraitSet
+      return copy(selfProvidedTraitSet, Seq(getInput))
+    }
+
+    val inputFieldCnt = getInput.getRowType.getFieldCount
+    val canSatisfy = if (requiredDistribution.getType == ANY) {
+      true
+    } else {
+      if (!grouping.isEmpty) {
+        if (requiredDistribution.requireStrict) {
+          requiredDistribution.getKeys == ImmutableIntList.of(grouping: _*)
+        } else {
+          val isAllFieldsFromInput = requiredDistribution.getKeys.forall(_ < 
inputFieldCnt)
+          if (isAllFieldsFromInput) {
+            val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
+            if (tableConfig.getConf.getBoolean(
+              PlannerConfigOptions.SQL_OPTIMIZER_SHUFFLE_PARTIAL_KEY_ENABLED)) 
{
+              ImmutableIntList.of(grouping: 
_*).containsAll(requiredDistribution.getKeys)
+            } else {
+              requiredDistribution.getKeys == ImmutableIntList.of(grouping: _*)
+            }
+          } else {
+            // If requirement distribution keys are not all comes from input 
directly,
+            // cannot satisfy requirement distribution and collations.
+            false
+          }
+        }
+      } else {
+        requiredDistribution.getType == SINGLETON
+      }
+    }
+    // If OverAggregate can provide distribution, but it's traits cannot 
satisfy required
+    // distribution, cannot push down distribution and collation requirement 
(because later
+    // shuffle will destroy previous collation.
+    if (!canSatisfy) {
+      return null
+    }
+
+    var inputRequiredTraits = getInput.getTraitSet
+    var providedTraits = selfProvidedTraitSet
+    val providedCollation = 
selfProvidedTraitSet.getTrait(RelCollationTraitDef.INSTANCE)
+    if (!requiredDistribution.isTop) {
+      inputRequiredTraits = inputRequiredTraits.replace(requiredDistribution)
+      providedTraits = providedTraits.replace(requiredDistribution)
+    }
+
+    if (providedCollation.satisfies(requiredCollation)) {
+      // the providedCollation can satisfy the requirement,
+      // so don't push down the sort into it's input.
+    } else if (providedCollation.getFieldCollations.isEmpty &&
+      requiredCollation.getFieldCollations.nonEmpty) {
+      // If OverAgg cannot provide collation itself, try to push down 
collation requirements into
+      // it's input if collation fields all come from input node.
+      val canPushDownCollation = requiredCollation.getFieldCollations
+        .forall(_.getFieldIndex < inputFieldCnt)
+      if (canPushDownCollation) {
+        inputRequiredTraits = inputRequiredTraits.replace(requiredCollation)
+        providedTraits = providedTraits.replace(requiredCollation)
+      }
+    } else {
+      // Don't push down the sort into it's input,
+      // due to the provided collation will destroy the input's provided 
collation.
+    }
+    val newInput = RelOptRule.convert(getInput, inputRequiredTraits)
+    copy(providedTraits, Seq(newInput))
+  }
+
+  private def inferProvidedTraitSet(): RelTraitSet = {
+    var selfProvidedTraitSet = getTraitSet
+    // provided distribution
+    val providedDistribution = if (grouping.nonEmpty) {
+      FlinkRelDistribution.hash(grouping.map(Integer.valueOf).toList, 
requireStrict = false)
 
 Review comment:
   requireStrict should be true?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to