LadyForest commented on code in PR #23505: URL: https://github.com/apache/flink/pull/23505#discussion_r1461562152
########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions.sql; + +import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableList; +import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableMap; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlCallBinding; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.TableCharacteristic; +import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.calcite.sql.validate.SqlValidator; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.Map; + +/** + * SqlSessionTableFunction implements an operator for per-key sessionization. It allows four + * parameters: + * + * <ol> + * <li>table as data source + * <li>a descriptor to provide a watermarked column name from the input table + * <li>a descriptor to provide a column as key, on which sessionization will be applied, optional + * <li>an interval parameter to specify a inactive activity gap to break sessions + * </ol> + * + * <p>Mainly copy from [[{@link org.apache.calcite.sql.SqlSessionTableFunction}]]. FLINK Review Comment: Nit: remove `[[]]`, which serves as the naviator for scala. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java: ########## @@ -88,4 +106,106 @@ private RexNode convertTryCast(SqlRexContext cx, final SqlCall call) { .makeCall( type, FlinkSqlOperatorTable.TRY_CAST, Collections.singletonList(valueRex)); } + + private boolean isSetSemanticsWindowTableFunction(SqlCall call) { + if (!(call.getOperator() instanceof SqlWindowTableFunction)) { + return false; + } + List<SqlNode> operands = call.getOperandList(); + return !operands.isEmpty() && operands.get(0).getKind() == SqlKind.SET_SEMANTICS_TABLE; + } + + /** + * The partition keys and order keys in ptf will be extracted to {@link + * RexSetSemanticsTableCall}. + * + * <p>Take "SESSION(TABLE my_table PARTITION BY (b, a), DESCRIPTOR(rowtime), INTERVAL '10' + * MINUTE)" as an example. + * + * <pre> + * The original SqlCall: + * + * SqlBasicCall: SESSION + * +- SqlBasicCall: SET_SEMANTICS_TABLE + * +- SqlSelect: "SELECT `my_table`.`a`, `my_table`.`b`, `my_table`.`c`, ... FROM ..." + * +- SqlNodeList: (PARTITION KEY) + * +- SqlIdentifier: "b" + * +- SqlIdentifier: "a" + * +- SqlNodeList: (ORDER KEY) + * +- SqlBasicCall: DESCRIPTOR(`rowtime`) + * +- SqlIdentifier: "rowtime" + * +- SqlInternalLiteral: INTERVAL '5' MINUTE + * </pre> + * + * <pre> + * After the process in Calcite: + * + * RexCall: SESSION + * +- RexCall: DESCRIPTOR(`rowtime`) + * +- RexFieldAccess: `rowtime` + * +- RexLiteral: 300000:INTERVAL MINUTE + * </pre> + * + * <pre> + * After we modified: + * + * RexSetSemanticsTableCall: SESSION + * +- PartitionKeys: [0, 1] + * +- OrderKeys: [] + * +- RexCall: DESCRIPTOR(`rowtime`) + * +- RexFieldAccess: `rowtime` + * +- RexLiteral: 300000:INTERVAL MINUTE + * </pre> + */ + private RexNode convertSetSemanticsWindowTableFunction(SqlRexContext cx, final SqlCall call) { + checkState( + call.getOperator() instanceof SqlSessionTableFunction, + "Currently only support SESSION table function in Set Semantics PTF."); + SqlSessionTableFunction fun = (SqlSessionTableFunction) call.getOperator(); + + List<SqlNode> operands = call.getOperandList(); + + SqlBasicCall setSemanticsPTFCall = (SqlBasicCall) operands.get(0); + SqlNodeList partitionKeys = setSemanticsPTFCall.operand(1); + SqlNodeList orderKeys = setSemanticsPTFCall.operand(2); + // RexNode subQuery = cx.convertExpression(setSemanticsPTFCall); Review Comment: Remove it if subQuery is not used. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java: ########## @@ -88,4 +106,106 @@ private RexNode convertTryCast(SqlRexContext cx, final SqlCall call) { .makeCall( type, FlinkSqlOperatorTable.TRY_CAST, Collections.singletonList(valueRex)); } + + private boolean isSetSemanticsWindowTableFunction(SqlCall call) { + if (!(call.getOperator() instanceof SqlWindowTableFunction)) { + return false; + } + List<SqlNode> operands = call.getOperandList(); + return !operands.isEmpty() && operands.get(0).getKind() == SqlKind.SET_SEMANTICS_TABLE; + } + + /** + * The partition keys and order keys in ptf will be extracted to {@link + * RexSetSemanticsTableCall}. + * + * <p>Take "SESSION(TABLE my_table PARTITION BY (b, a), DESCRIPTOR(rowtime), INTERVAL '10' + * MINUTE)" as an example. + * + * <pre> + * The original SqlCall: + * + * SqlBasicCall: SESSION + * +- SqlBasicCall: SET_SEMANTICS_TABLE + * +- SqlSelect: "SELECT `my_table`.`a`, `my_table`.`b`, `my_table`.`c`, ... FROM ..." + * +- SqlNodeList: (PARTITION KEY) + * +- SqlIdentifier: "b" + * +- SqlIdentifier: "a" + * +- SqlNodeList: (ORDER KEY) + * +- SqlBasicCall: DESCRIPTOR(`rowtime`) + * +- SqlIdentifier: "rowtime" + * +- SqlInternalLiteral: INTERVAL '5' MINUTE + * </pre> + * + * <pre> + * After the process in Calcite: + * + * RexCall: SESSION + * +- RexCall: DESCRIPTOR(`rowtime`) + * +- RexFieldAccess: `rowtime` + * +- RexLiteral: 300000:INTERVAL MINUTE + * </pre> + * + * <pre> + * After we modified: + * + * RexSetSemanticsTableCall: SESSION + * +- PartitionKeys: [0, 1] + * +- OrderKeys: [] + * +- RexCall: DESCRIPTOR(`rowtime`) + * +- RexFieldAccess: `rowtime` + * +- RexLiteral: 300000:INTERVAL MINUTE + * </pre> + */ + private RexNode convertSetSemanticsWindowTableFunction(SqlRexContext cx, final SqlCall call) { + checkState( + call.getOperator() instanceof SqlSessionTableFunction, + "Currently only support SESSION table function in Set Semantics PTF."); + SqlSessionTableFunction fun = (SqlSessionTableFunction) call.getOperator(); + + List<SqlNode> operands = call.getOperandList(); + + SqlBasicCall setSemanticsPTFCall = (SqlBasicCall) operands.get(0); + SqlNodeList partitionKeys = setSemanticsPTFCall.operand(1); + SqlNodeList orderKeys = setSemanticsPTFCall.operand(2); + // RexNode subQuery = cx.convertExpression(setSemanticsPTFCall); + checkState(orderKeys.isEmpty(), "SESSION table function does not support order keys."); + RexCall resolvedCall = + (RexCall) StandardConvertletTable.INSTANCE.convertWindowFunction(cx, fun, call); + ImmutableBitSet partitionKeyRefs = getPartitionKeyIndices(cx, partitionKeys); + + // attach the partition keys and order keys on the custom rex call + resolvedCall = + new RexSetSemanticsTableCall( + resolvedCall.getType(), + resolvedCall.getOperator(), + resolvedCall.getOperands(), + partitionKeyRefs, + ImmutableBitSet.builder().build()); + return resolvedCall; + } + + private ImmutableBitSet getPartitionKeyIndices(SqlRexContext cx, SqlNodeList partitions) { + final ImmutableBitSet.Builder result = ImmutableBitSet.builder(); + + for (SqlNode partition : partitions) { + RexNode expr = cx.convertExpression(partition); + result.set(parseFieldIdx(expr)); + } + return result.build(); + } + + private static int parseFieldIdx(RexNode e) { + switch (e.getKind()) { + case FIELD_ACCESS: Review Comment: I was wondering in which condition we might get a `RexFieldAccess`? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java: ########## @@ -19,23 +19,36 @@ package org.apache.flink.table.planner.calcite; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.TableException; import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; +import org.apache.flink.table.planner.functions.sql.SqlSessionTableFunction; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexFieldAccess; +import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlBasicCall; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; import org.apache.calcite.sql.SqlUtil; +import org.apache.calcite.sql.SqlWindowTableFunction; import org.apache.calcite.sql.validate.SqlValidatorImpl; import org.apache.calcite.sql2rel.SqlRexContext; import org.apache.calcite.sql2rel.SqlRexConvertlet; import org.apache.calcite.sql2rel.SqlRexConvertletTable; import org.apache.calcite.sql2rel.StandardConvertletTable; +import org.apache.calcite.util.ImmutableBitSet; import java.util.Collections; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkState; Review Comment: ```suggestion import static org.apache.flink.util.Preconditions.checkArgument; ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWindowTableFunctionTransposeRule.java: ########## @@ -76,15 +78,22 @@ public void onMatch(RelOptRuleCall call) { LogicalTableFunctionScan scan = call.rel(1); RelNode scanInput = scan.getInput(0); TimeAttributeWindowingStrategy windowingStrategy = - WindowUtil.convertToWindowingStrategy( - (RexCall) scan.getCall(), scanInput.getRowType()); + WindowUtil.convertToWindowingStrategy((RexCall) scan.getCall(), scanInput); // 1. get fields to push down ImmutableBitSet projectFields = RelOptUtil.InputFinder.bits(project.getProjects(), null); int scanInputFieldCount = scanInput.getRowType().getFieldCount(); ImmutableBitSet toPushFields = ImmutableBitSet.range(0, scanInputFieldCount) .intersect(projectFields) .set(windowingStrategy.getTimeAttributeIndex()); + // partition keys in session window tvf also need be pushed down + if (windowingStrategy.getWindow() instanceof SessionWindowSpec) { + SessionWindowSpec sessionWindowSpec = (SessionWindowSpec) windowingStrategy.getWindow(); + int[] partitionKeyIndices = sessionWindowSpec.getPartitionKeyIndices(); + for (int partitionKeyIndex : partitionKeyIndices) { + toPushFields = toPushFields.set(partitionKeyIndex); + } Review Comment: ```suggestion toPushFields = toPushFields.union(ImmutableBitSet.of(partitionKeyIndices)); ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java: ########## @@ -160,6 +170,10 @@ public StreamExecWindowAggregate( @Override protected Transformation<RowData> translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { + if (shouldFallbackToGroupWindowAgg(windowing.getWindow())) { + return fallbackToGroupWindowAggregate(planner, config); + } + Review Comment: I suggest inlining the condition check since it's straightforward, and meanwhile, updating the method name to emphasize that it's falling back to the legacy impl ```suggestion // TODO Currently, the operator of WindowAggregate does not support Session Window, and it // needs to fall back to the legacy GroupWindowAggregate. See more at FLINK-34048. if (windowing.getWindow() instanceof SessionWindowSpec) { return fallbackToLegacyGroupWindowAggregate(planner, config); } ``` ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala: ########## @@ -92,13 +93,57 @@ class FlinkRelMdWindowProperties private extends MetadataHandler[FlinkMetadata.W return null } + val windowSpec = childProps.getWindowSpec + // if the window a + if (!validateWindowProperties(windowSpec, mapInToOutPos)) { Review Comment: Nit: `validWindowProperties`. IMO, methods like `validateXXX` is usually with `void` as the return type. However, if the status is required, then the method name should be like `isValidXXX` or `validXXX`. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala: ########## @@ -392,4 +433,54 @@ object WindowUtil { } false } + + /** + * This method only checks the window that contains partition keys like session window. The Review Comment: like? I think you might want to use the term "for" or "within"? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java: ########## @@ -88,4 +106,106 @@ private RexNode convertTryCast(SqlRexContext cx, final SqlCall call) { .makeCall( type, FlinkSqlOperatorTable.TRY_CAST, Collections.singletonList(valueRex)); } + + private boolean isSetSemanticsWindowTableFunction(SqlCall call) { + if (!(call.getOperator() instanceof SqlWindowTableFunction)) { + return false; + } + List<SqlNode> operands = call.getOperandList(); + return !operands.isEmpty() && operands.get(0).getKind() == SqlKind.SET_SEMANTICS_TABLE; + } + + /** + * The partition keys and order keys in ptf will be extracted to {@link + * RexSetSemanticsTableCall}. + * + * <p>Take "SESSION(TABLE my_table PARTITION BY (b, a), DESCRIPTOR(rowtime), INTERVAL '10' + * MINUTE)" as an example. + * + * <pre> + * The original SqlCall: + * + * SqlBasicCall: SESSION + * +- SqlBasicCall: SET_SEMANTICS_TABLE + * +- SqlSelect: "SELECT `my_table`.`a`, `my_table`.`b`, `my_table`.`c`, ... FROM ..." + * +- SqlNodeList: (PARTITION KEY) + * +- SqlIdentifier: "b" + * +- SqlIdentifier: "a" + * +- SqlNodeList: (ORDER KEY) + * +- SqlBasicCall: DESCRIPTOR(`rowtime`) + * +- SqlIdentifier: "rowtime" + * +- SqlInternalLiteral: INTERVAL '5' MINUTE + * </pre> + * + * <pre> + * After the process in Calcite: + * + * RexCall: SESSION + * +- RexCall: DESCRIPTOR(`rowtime`) + * +- RexFieldAccess: `rowtime` + * +- RexLiteral: 300000:INTERVAL MINUTE + * </pre> + * + * <pre> + * After we modified: + * + * RexSetSemanticsTableCall: SESSION + * +- PartitionKeys: [0, 1] Review Comment: I think the correct order should be [1, 0]. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java: ########## @@ -88,4 +106,106 @@ private RexNode convertTryCast(SqlRexContext cx, final SqlCall call) { .makeCall( type, FlinkSqlOperatorTable.TRY_CAST, Collections.singletonList(valueRex)); } + + private boolean isSetSemanticsWindowTableFunction(SqlCall call) { + if (!(call.getOperator() instanceof SqlWindowTableFunction)) { + return false; + } + List<SqlNode> operands = call.getOperandList(); + return !operands.isEmpty() && operands.get(0).getKind() == SqlKind.SET_SEMANTICS_TABLE; + } + + /** + * The partition keys and order keys in ptf will be extracted to {@link + * RexSetSemanticsTableCall}. + * + * <p>Take "SESSION(TABLE my_table PARTITION BY (b, a), DESCRIPTOR(rowtime), INTERVAL '10' + * MINUTE)" as an example. + * + * <pre> + * The original SqlCall: + * + * SqlBasicCall: SESSION + * +- SqlBasicCall: SET_SEMANTICS_TABLE + * +- SqlSelect: "SELECT `my_table`.`a`, `my_table`.`b`, `my_table`.`c`, ... FROM ..." + * +- SqlNodeList: (PARTITION KEY) + * +- SqlIdentifier: "b" + * +- SqlIdentifier: "a" + * +- SqlNodeList: (ORDER KEY) + * +- SqlBasicCall: DESCRIPTOR(`rowtime`) + * +- SqlIdentifier: "rowtime" + * +- SqlInternalLiteral: INTERVAL '5' MINUTE + * </pre> + * + * <pre> + * After the process in Calcite: + * + * RexCall: SESSION + * +- RexCall: DESCRIPTOR(`rowtime`) + * +- RexFieldAccess: `rowtime` + * +- RexLiteral: 300000:INTERVAL MINUTE + * </pre> + * + * <pre> + * After we modified: + * + * RexSetSemanticsTableCall: SESSION + * +- PartitionKeys: [0, 1] + * +- OrderKeys: [] + * +- RexCall: DESCRIPTOR(`rowtime`) + * +- RexFieldAccess: `rowtime` + * +- RexLiteral: 300000:INTERVAL MINUTE + * </pre> + */ + private RexNode convertSetSemanticsWindowTableFunction(SqlRexContext cx, final SqlCall call) { + checkState( + call.getOperator() instanceof SqlSessionTableFunction, + "Currently only support SESSION table function in Set Semantics PTF."); + SqlSessionTableFunction fun = (SqlSessionTableFunction) call.getOperator(); + Review Comment: ```suggestion checkArgument( call.getOperator() instanceof SqlSessionTableFunction, "Currently, only the SESSION table function is supported in Set Semantics PTF."); SqlSessionTableFunction fun = (SqlSessionTableFunction) call.getOperator(); ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java: ########## @@ -5335,6 +5364,11 @@ public RexNode convertExpression(SqlNode expr) { root = convertQueryRecursive(query, false, null); return RexSubQuery.multiset(root.rel); + // case SET_SEMANTICS_TABLE: + // call = (SqlCall) expr; + // query = call.operand(0); + // root = convertQueryRecursive(query, false, null); + // return RexSubQuery.setSemanticsTable(root.rel); Review Comment: Remove this. Nit: compared with the master branch to check diff, rather than the base commit. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala: ########## @@ -234,11 +235,45 @@ object WindowUtil { val step = getOperandAsLong(windowCall.operands(1)) val maxSize = getOperandAsLong(windowCall.operands(2)) new CumulativeWindowSpec(Duration.ofMillis(maxSize), Duration.ofMillis(step), offset) + case FlinkSqlOperatorTable.SESSION => + windowCall match { + // with syntax partition key + case setSemanticsTableCall: RexSetSemanticsTableCall => + if (!setSemanticsTableCall.getOrderKeys.isEmpty) { + throw new ValidationException("Session window TVF doesn't support order by clause.") + } + val gap = getOperandAsLong(windowCall.operands(1)) + new SessionWindowSpec( + Duration.ofMillis(gap), + windowCall.asInstanceOf[RexSetSemanticsTableCall].getPartitionKeys.toArray) + // without syntax partition key Review Comment: I'm a little bit confused. IIUC, no matter w/o the `partition by` clause, the query is always translated to `RexSetSemanticsTableCall`. Why differentiate them here? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexSetSemanticsTableCall.java: ########## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.calcite; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSyntax; +import org.apache.calcite.util.ImmutableBitSet; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * A special {@link RexCall} that is used to represent a table function with set semantics. See more + * details in {@link FlinkConvertletTable#convertSetSemanticsWindowTableFunction}. + */ +public class RexSetSemanticsTableCall extends RexCall { + + private final ImmutableBitSet partitionKeys; + + private final ImmutableBitSet orderKeys; + + public RexSetSemanticsTableCall( + RelDataType type, + SqlOperator operator, + List<? extends RexNode> operands, + ImmutableBitSet partitionKeys, + ImmutableBitSet orderKeys) { + super(type, operator, operands); + this.partitionKeys = partitionKeys; + this.orderKeys = orderKeys; + } + + public ImmutableBitSet getPartitionKeys() { + return partitionKeys; + } + + public ImmutableBitSet getOrderKeys() { + return orderKeys; + } + + @Override + protected String computeDigest(boolean withType) { Review Comment: How about ```java @Override protected String computeDigest(boolean withType) { if ((operands.isEmpty()) && (op.getSyntax() == SqlSyntax.FUNCTION_ID)) { return super.computeDigest(withType); } final StringBuilder sb = new StringBuilder(op.getName()); sb.append("("); appendKeys(partitionKeys, "PARTITION BY", sb); appendKeys(orderKeys, "ORDER BY", sb); appendOperands(sb); sb.append(")"); if (withType) { sb.append(":"); // NOTE jvs 16-Jan-2005: for digests, it is very important // to use the full type string. sb.append(type.getFullTypeString()); } return sb.toString(); } private void appendKeys(ImmutableBitSet keys, String prefix, StringBuilder sb) { sb.append( keys.asList().stream() .map(key -> "$" + key) .collect(Collectors.joining(",", prefix + "(", "), "))); } ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java: ########## @@ -88,4 +106,106 @@ private RexNode convertTryCast(SqlRexContext cx, final SqlCall call) { .makeCall( type, FlinkSqlOperatorTable.TRY_CAST, Collections.singletonList(valueRex)); } + + private boolean isSetSemanticsWindowTableFunction(SqlCall call) { + if (!(call.getOperator() instanceof SqlWindowTableFunction)) { + return false; + } + List<SqlNode> operands = call.getOperandList(); + return !operands.isEmpty() && operands.get(0).getKind() == SqlKind.SET_SEMANTICS_TABLE; + } + + /** + * The partition keys and order keys in ptf will be extracted to {@link + * RexSetSemanticsTableCall}. + * + * <p>Take "SESSION(TABLE my_table PARTITION BY (b, a), DESCRIPTOR(rowtime), INTERVAL '10' + * MINUTE)" as an example. + * + * <pre> + * The original SqlCall: + * + * SqlBasicCall: SESSION + * +- SqlBasicCall: SET_SEMANTICS_TABLE + * +- SqlSelect: "SELECT `my_table`.`a`, `my_table`.`b`, `my_table`.`c`, ... FROM ..." + * +- SqlNodeList: (PARTITION KEY) + * +- SqlIdentifier: "b" + * +- SqlIdentifier: "a" + * +- SqlNodeList: (ORDER KEY) + * +- SqlBasicCall: DESCRIPTOR(`rowtime`) + * +- SqlIdentifier: "rowtime" + * +- SqlInternalLiteral: INTERVAL '5' MINUTE Review Comment: ```suggestion * Due to CALCITE-6204, we need to manually extract partition keys and order keys and convert * them to {@link RexSetSemanticsTableCall}. * * <p>Take `SESSION(TABLE my_table PARTITION BY (b, a), DESCRIPTOR(rowtime), INTERVAL '10' * MINUTE)` as an example. * * <p>The original SqlNode tree after syntax parse looks like * * <pre> * SqlBasicCall: SESSION * ├─ SqlBasicCall: SET_SEMANTICS_TABLE * │ ├─ SqlSelect: "SELECT ... FROM ..." * │ ├─ SqlNodeList: (PARTITION KEY) * │ │ ├─ SqlIdentifier: "b" * │ │ └─ SqlIdentifier: "a" * │ └─ SqlNodeList: (ORDER KEY) * ├─ SqlBasicCall: DESCRIPTOR(`rowtime`) * │ └─ SqlIdentifier: "rowtime" * └─ SqlInternalLiteral: INTERVAL '5' MINUTE * </pre> * * <p>Calcite will skip the first operand of SESSION operator, which leads to the following * wrong rex call * * <pre> * RexCall: SESSION * ├─ RexCall: DESCRIPTOR(`rowtime`) * │ └─ RexFieldAccess: `rowtime` * └─ RexLiteral: 300000:INTERVAL MINUTE * </pre> * * <p>As a workaround, we flatten the inner sql call and convert it to a customized {@link * RexSetSemanticsTableCall} to preserve partition keys and order keys * * <pre> * RexSetSemanticsTableCall: SESSION * ├─ PartitionKeys: [1, 0] * ├─ OrderKeys: [] * ├─ RexCall: DESCRIPTOR(`rowtime`) * │ └─ RexFieldAccess: `rowtime` * └─ RexLiteral: 300000:INTERVAL MINUTE * </pre> ``` ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala: ########## @@ -92,13 +93,57 @@ class FlinkRelMdWindowProperties private extends MetadataHandler[FlinkMetadata.W return null } + val windowSpec = childProps.getWindowSpec + // if the window a + if (!validateWindowProperties(windowSpec, mapInToOutPos)) { + // If the window becomes illegal after passing through calc or project, + // return no window properties + return null + } + childProps.copy( transformColumnIndex(childProps.getWindowStartColumns, mapInToOutPos), transformColumnIndex(childProps.getWindowEndColumns, mapInToOutPos), - transformColumnIndex(childProps.getWindowTimeColumns, mapInToOutPos) + transformColumnIndex(childProps.getWindowTimeColumns, mapInToOutPos), + updateWindowSpec(windowSpec, mapInToOutPos) ) } + /** + * Validate the window properties when passing through calc or project. + * + * This method only checks the window that contains partition keys like session window. See more Review Comment: like => for? ########## flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java: ########## @@ -1432,9 +1431,17 @@ private void substituteSubQuery(Blackboard bb, SubQuery subQuery) { bb.cursors.add(converted.r); return; case SET_SEMANTICS_TABLE: - if (!config.isExpand()) { - return; - } + // ----- FLINK MODIFICATION BEGIN ----- + // For two reasons, we always expand SET SEMANTICS TABLE + // 1. Calcite has a bug when not expanding the SET SEMANTICS TABLE. See more in + // CALCITE-6204. + // 2. Currently, Flink’s built-in Session Window TVF is the only SET SEMANTICS + // TABLE. We will expand it by default like other built-in window TVFs to reuse some + // subsequent processing logic and optimization logic. + // if (!config.isExpand()) { + // return; + // } Review Comment: ```suggestion // We always expand the SET SEMANTICS TABLE for two reasons: // 1. Calcite has a bug when not expanding the SET SEMANTICS TABLE. For more // information, see CALCITE-6204. // 2. Currently, Flink’s built-in Session Window TVF is the only PTF with SET // SEMANTICS. We will expand it by default, like other built-in window TVFs, to // reuse some subsequent processing and optimization logic. // if (!config.isExpand()) { // return; // } ``` ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala: ########## @@ -92,13 +93,57 @@ class FlinkRelMdWindowProperties private extends MetadataHandler[FlinkMetadata.W return null } + val windowSpec = childProps.getWindowSpec + // if the window a + if (!validateWindowProperties(windowSpec, mapInToOutPos)) { + // If the window becomes illegal after passing through calc or project, + // return no window properties + return null + } + childProps.copy( transformColumnIndex(childProps.getWindowStartColumns, mapInToOutPos), transformColumnIndex(childProps.getWindowEndColumns, mapInToOutPos), - transformColumnIndex(childProps.getWindowTimeColumns, mapInToOutPos) + transformColumnIndex(childProps.getWindowTimeColumns, mapInToOutPos), + updateWindowSpec(windowSpec, mapInToOutPos) ) } + /** + * Validate the window properties when passing through calc or project. + * + * This method only checks the window that contains partition keys like session window. See more + * at [[WindowUtil.validatePartitionKeyIfNecessary]] + */ + private def validateWindowProperties( + windowSpec: WindowSpec, + mapInToOutPos: JHashMap[Int, JList[Int]]): Boolean = { + windowSpec match { + case session: SessionWindowSpec => + val oldPartitionKeys = session.getPartitionKeyIndices + oldPartitionKeys.forall(oldPartitionKey => mapInToOutPos.contains(oldPartitionKey)) + case _ => true + } + } + + /** Update partition key indices in session window spec according to the projection map. */ + private def updateWindowSpec( + oldWindowSpec: WindowSpec, + mapInToOutPos: JHashMap[Int, JList[Int]]): WindowSpec = { + oldWindowSpec match { + case session: SessionWindowSpec => + val oldPartitionKeys = session.getPartitionKeyIndices + val newPartitionKeys = oldPartitionKeys.map( + oldPartitionKey => { + val newPartitionKey = mapInToOutPos.get(oldPartitionKey) + Preconditions.checkState(newPartitionKey.size == 1) Review Comment: Nit: checkArgument? ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.scala: ########## @@ -146,6 +152,15 @@ class PullUpWindowTableFunctionIntoWindowAggregateRule call.transformTo(newWindowAgg) } + + private def unwrapRel(rel: RelNode): RelNode = { Review Comment: Nit: add `@tailrec` annotation ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala: ########## @@ -317,11 +352,17 @@ object WindowUtil { /** * For rowtime window, return true if the given aggregate grouping contains window start and end. * For proctime window, we should also check if it exists a neighbour windowTableFunctionCall. + * + * If the window is a session window, we should also check if the partition keys are the same as + * the group keys. See more at [[WindowUtil.validateGroupKeyPartitionKeyIfNecessary()]]. */ def isValidWindowAggregate(agg: FlinkLogicalAggregate): Boolean = { val fmq = FlinkRelMetadataQuery.reuseOrCreate(agg.getCluster.getMetadataQuery) val windowProperties = fmq.getRelWindowProperties(agg.getInput) val grouping = agg.getGroupSet + if (!validateGroupKeyPartitionKeyIfNecessary(grouping, windowProperties)) { Review Comment: `validGroupKeyPartitionKeyIfNecessary` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
