lincoln-lil commented on code in PR #25380:
URL: https://github.com/apache/flink/pull/25380#discussion_r1774875365
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RankUtil.scala:
##########
@@ -352,21 +352,55 @@ object RankUtil {
}
val inputRowType = rank.getInput.getRowType
- val isSortOnTimeAttribute = sortOnTimeAttribute(sortCollation,
inputRowType)
+ val isSortOnTimeAttribute = sortOnTimeAttributeOnly(sortCollation,
inputRowType)
!rank.outputRankNumber && isLimit1 && isSortOnTimeAttribute &&
isRowNumberType
}
- private def sortOnTimeAttribute(
+ private def sortOnTimeAttributeOnly(
sortCollation: RelCollation,
inputRowType: RelDataType): Boolean = {
if (sortCollation.getFieldCollations.size() != 1) {
- false
- } else {
- val firstSortField = sortCollation.getFieldCollations.get(0)
- val fieldType =
inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType
- FlinkTypeFactory.isProctimeIndicatorType(fieldType) ||
- FlinkTypeFactory.isRowtimeIndicatorType(fieldType)
+ return false
+ }
+ val firstSortField = sortCollation.getFieldCollations.get(0)
+ val fieldType =
inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType
+ FlinkTypeFactory.isProctimeIndicatorType(fieldType) ||
+ FlinkTypeFactory.isRowtimeIndicatorType(fieldType)
+ }
+
+ /**
+ * Checks if the given sort collation has a field collation which based on a
rowtime attribute.
+ */
+ def sortOnRowTime(sortCollation: RelCollation, inputRowType: RelDataType):
Boolean = {
+ sortCollation.getFieldCollations.exists {
+ firstSortField =>
+ val fieldType =
inputRowType.getFieldList.get(firstSortField.getFieldIndex).getType
+ FlinkTypeFactory.isRowtimeIndicatorType(fieldType)
+ }
+ }
+
+ /** Whether the given rank is logically a deduplication. */
+ def isDeduplication(rank: Rank): Boolean = {
+ !rank.outputRankNumber && rank.rankType == RankType.ROW_NUMBER &&
isTop1(rank.rankRange)
+ }
+
+ /** Whether the given [[StreamPhysicalRank]] could be converted to
[[StreamExecDeduplicate]]. */
+ def canConvertToDeduplicate(rank: StreamPhysicalRank): Boolean = {
+ lazy val inputInsertOnly = ChangelogPlanUtils.inputInsertOnly(rank)
+ lazy val sortOnTimeAttributeOnly =
+ RankUtil.sortOnTimeAttributeOnly(rank.orderKey, rank.getInput.getRowType)
+
+ isDeduplication(rank) && inputInsertOnly && sortOnTimeAttributeOnly
+ }
+
+ /** Determines if the given order key indicates that the last row should be
kept. */
+ def keepLastRow(orderKey: RelCollation): Boolean = {
Review Comment:
TODO rename
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]