godfreyhe commented on a change in pull request #16620:
URL: https://github.com/apache/flink/pull/16620#discussion_r683069888
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##########
@@ -544,11 +595,110 @@ private RelNode convertAggInput(Aggregate agg) {
.collect(Collectors.toList());
}
+ private FlinkLogicalWindowAggregate
visitWindowAggregate(FlinkLogicalWindowAggregate agg) {
Review comment:
It's better we can add some tests in WindowAggregateTest to cover these
changes
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##########
@@ -544,11 +595,110 @@ private RelNode convertAggInput(Aggregate agg) {
.collect(Collectors.toList());
}
+ private FlinkLogicalWindowAggregate
visitWindowAggregate(FlinkLogicalWindowAggregate agg) {
+ RelNode newInput = convertAggInput(agg);
+ List<AggregateCall> updatedAggCalls = convertAggregateCalls(agg);
+ LogicalWindow oldWindow = agg.getWindow();
+ Seq<PlannerNamedWindowProperty> oldNamedProperties =
agg.getNamedProperties();
+ FieldReferenceExpression oldTimeAttribute =
agg.getWindow().timeAttribute();
+ LogicalType oldTimeAttributeType =
oldTimeAttribute.getOutputDataType().getLogicalType();
+ boolean isRowtimeIndicator =
LogicalTypeChecks.isRowtimeAttribute(oldTimeAttributeType);
+ boolean convertedToRowtimeTimestampLtz;
+ if (!isRowtimeIndicator) {
+ convertedToRowtimeTimestampLtz = false;
+ } else {
+ int timeIndicatorIdx = oldTimeAttribute.getFieldIndex();
+ RelDataType oldType =
+
agg.getInput().getRowType().getFieldList().get(timeIndicatorIdx).getType();
+ RelDataType newType =
+
newInput.getRowType().getFieldList().get(timeIndicatorIdx).getType();
+ convertedToRowtimeTimestampLtz =
+ isTimestampLtzType(newType) &&
!isTimestampLtzType(oldType);
+ }
+ LogicalWindow newWindow;
+ Seq<PlannerNamedWindowProperty> newNamedProperties;
+ if (convertedToRowtimeTimestampLtz) {
+ // MATCH_ROWTIME may be converted from rowtime attribute to
timestamp_ltz rowtime
+ // attribute, if time indicator of current window aggregate
depends on input
+ // MATCH_ROWTIME, we should rewrite logicalWindow and
namedProperties.
+ LogicalType newTimestampLtzType =
+ new LocalZonedTimestampType(
+ oldTimeAttributeType.isNullable(),
TimestampKind.ROWTIME, 3);
+ FieldReferenceExpression newFieldRef =
+ new FieldReferenceExpression(
+ oldTimeAttribute.getName(),
+ fromLogicalTypeToDataType(newTimestampLtzType),
+ oldTimeAttribute.getInputIndex(),
+ oldTimeAttribute.getFieldIndex());
+ PlannerWindowReference newAlias =
+ new PlannerWindowReference(
+ oldWindow.aliasAttribute().getName(),
newTimestampLtzType);
+ if (oldWindow instanceof TumblingGroupWindow) {
+ TumblingGroupWindow window = (TumblingGroupWindow) oldWindow;
+ newWindow = new TumblingGroupWindow(newAlias, newFieldRef,
window.size());
+ } else if (oldWindow instanceof SlidingGroupWindow) {
+ SlidingGroupWindow window = (SlidingGroupWindow) oldWindow;
+ newWindow =
+ new SlidingGroupWindow(
+ newAlias, newFieldRef, window.size(),
window.slide());
+ } else if (oldWindow instanceof SessionGroupWindow) {
+ SessionGroupWindow window = (SessionGroupWindow) oldWindow;
+ newWindow = new SessionGroupWindow(newAlias, newFieldRef,
window.gap());
+ } else {
+ throw new TableException(
+ String.format(
+ "This is a bug and should not happen. Please
file an issue. Invalid window %s.",
+ oldWindow.getClass().getSimpleName()));
+ }
+ List<PlannerNamedWindowProperty> newNamedPropertiesList =
+
JavaConverters.seqAsJavaListConverter(oldNamedProperties).asJava().stream()
+ .map(
+ namedProperty -> {
+ if (namedProperty.getProperty()
+ instanceof
PlannerRowtimeAttribute) {
+ return new
PlannerNamedWindowProperty(
+ namedProperty.getName(),
+ new
PlannerRowtimeAttribute(newAlias));
+ } else {
+ return namedProperty;
+ }
+ })
+ .collect(Collectors.toList());
+ newNamedProperties =
+
JavaConverters.iterableAsScalaIterableConverter(newNamedPropertiesList)
+ .asScala()
+ .toSeq();
+ } else {
+ newWindow = oldWindow;
+ newNamedProperties = oldNamedProperties;
+ }
+ return new FlinkLogicalWindowAggregate(
+ agg.getCluster(),
+ agg.getTraitSet(),
+ newInput,
+ agg.getGroupSet(),
+ updatedAggCalls,
+ newWindow,
+ newNamedProperties);
+ }
+
+ private RelNode visitInvalidRel(RelNode node) {
+ throw new TableException(
+ String.format(
+ "This is a bug and should not happen. Please file an
issue. Unknown node %s.",
+ node.getRelTypeName()));
+ }
+
//
----------------------------------------------------------------------------------------
// Utility
//
----------------------------------------------------------------------------------------
private RelNode materializeProcTime(RelNode node) {
+ // If input is empty values, ignore materialize
+ if (node instanceof FlinkLogicalValues
+ && FlinkLogicalValues.isEmpty((FlinkLogicalValues) node)) {
+ return node;
+ }
Review comment:
please add some comments to explain why we the empty values ignore
materialize in the code ?
##########
File path:
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeduplicateTest.xml
##########
@@ -207,7 +207,7 @@ LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3],
rowtime=[$4], rowNum=[$5])
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
-Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime,
1:BIGINT AS rowNum])
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime,
1:BIGINT AS $5])
Review comment:
I create a similar issue in Calcite community:
https://issues.apache.org/jira/browse/CALCITE-2718. Julian's point of view, "We
don't guarantee to preserve names when planner rules are applied." But I think,
we should to improve it as much as possible to make the plan easier to read,
because the names will be display in execution graph on flink web.
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
##########
@@ -544,11 +595,110 @@ private RelNode convertAggInput(Aggregate agg) {
.collect(Collectors.toList());
}
+ private FlinkLogicalWindowAggregate
visitWindowAggregate(FlinkLogicalWindowAggregate agg) {
Review comment:
make sense
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]