LadyForest commented on code in PR #23505:
URL: https://github.com/apache/flink/pull/23505#discussion_r1461562152


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.sql;
+
+import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableMap;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.TableCharacteristic;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.validate.SqlValidator;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+import java.util.Map;
+
+/**
+ * SqlSessionTableFunction implements an operator for per-key sessionization. 
It allows four
+ * parameters:
+ *
+ * <ol>
+ *   <li>table as data source
+ *   <li>a descriptor to provide a watermarked column name from the input table
+ *   <li>a descriptor to provide a column as key, on which sessionization will 
be applied, optional
+ *   <li>an interval parameter to specify a inactive activity gap to break 
sessions
+ * </ol>
+ *
+ * <p>Mainly copy from [[{@link 
org.apache.calcite.sql.SqlSessionTableFunction}]]. FLINK

Review Comment:
   Nit: remove `[[]]`, which serves as the naviator for scala.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java:
##########
@@ -88,4 +106,106 @@ private RexNode convertTryCast(SqlRexContext cx, final 
SqlCall call) {
                 .makeCall(
                         type, FlinkSqlOperatorTable.TRY_CAST, 
Collections.singletonList(valueRex));
     }
+
+    private boolean isSetSemanticsWindowTableFunction(SqlCall call) {
+        if (!(call.getOperator() instanceof SqlWindowTableFunction)) {
+            return false;
+        }
+        List<SqlNode> operands = call.getOperandList();
+        return !operands.isEmpty() && operands.get(0).getKind() == 
SqlKind.SET_SEMANTICS_TABLE;
+    }
+
+    /**
+     * The partition keys and order keys in ptf will be extracted to {@link
+     * RexSetSemanticsTableCall}.
+     *
+     * <p>Take "SESSION(TABLE my_table PARTITION BY (b, a), 
DESCRIPTOR(rowtime), INTERVAL '10'
+     * MINUTE)" as an example.
+     *
+     * <pre>
+     *     The original SqlCall:
+     *
+     *     SqlBasicCall: SESSION
+     *     +- SqlBasicCall: SET_SEMANTICS_TABLE
+     *       +- SqlSelect: "SELECT `my_table`.`a`, `my_table`.`b`, 
`my_table`.`c`, ... FROM ..."
+     *       +- SqlNodeList: (PARTITION KEY)
+     *          +- SqlIdentifier: "b"
+     *          +- SqlIdentifier: "a"
+     *       +- SqlNodeList: (ORDER KEY)
+     *     +- SqlBasicCall: DESCRIPTOR(`rowtime`)
+     *       +- SqlIdentifier: "rowtime"
+     *     +- SqlInternalLiteral: INTERVAL '5' MINUTE
+     * </pre>
+     *
+     * <pre>
+     *     After the process in Calcite:
+     *
+     *     RexCall: SESSION
+     *     +- RexCall: DESCRIPTOR(`rowtime`)
+     *       +- RexFieldAccess: `rowtime`
+     *     +- RexLiteral: 300000:INTERVAL MINUTE
+     * </pre>
+     *
+     * <pre>
+     *     After we modified:
+     *
+     *     RexSetSemanticsTableCall: SESSION
+     *     +- PartitionKeys: [0, 1]
+     *     +- OrderKeys: []
+     *     +- RexCall: DESCRIPTOR(`rowtime`)
+     *       +- RexFieldAccess: `rowtime`
+     *     +- RexLiteral: 300000:INTERVAL MINUTE
+     * </pre>
+     */
+    private RexNode convertSetSemanticsWindowTableFunction(SqlRexContext cx, 
final SqlCall call) {
+        checkState(
+                call.getOperator() instanceof SqlSessionTableFunction,
+                "Currently only support SESSION table function in Set 
Semantics PTF.");
+        SqlSessionTableFunction fun = (SqlSessionTableFunction) 
call.getOperator();
+
+        List<SqlNode> operands = call.getOperandList();
+
+        SqlBasicCall setSemanticsPTFCall = (SqlBasicCall) operands.get(0);
+        SqlNodeList partitionKeys = setSemanticsPTFCall.operand(1);
+        SqlNodeList orderKeys = setSemanticsPTFCall.operand(2);
+        //        RexNode subQuery = cx.convertExpression(setSemanticsPTFCall);

Review Comment:
   Remove it if subQuery is not used.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java:
##########
@@ -88,4 +106,106 @@ private RexNode convertTryCast(SqlRexContext cx, final 
SqlCall call) {
                 .makeCall(
                         type, FlinkSqlOperatorTable.TRY_CAST, 
Collections.singletonList(valueRex));
     }
+
+    private boolean isSetSemanticsWindowTableFunction(SqlCall call) {
+        if (!(call.getOperator() instanceof SqlWindowTableFunction)) {
+            return false;
+        }
+        List<SqlNode> operands = call.getOperandList();
+        return !operands.isEmpty() && operands.get(0).getKind() == 
SqlKind.SET_SEMANTICS_TABLE;
+    }
+
+    /**
+     * The partition keys and order keys in ptf will be extracted to {@link
+     * RexSetSemanticsTableCall}.
+     *
+     * <p>Take "SESSION(TABLE my_table PARTITION BY (b, a), 
DESCRIPTOR(rowtime), INTERVAL '10'
+     * MINUTE)" as an example.
+     *
+     * <pre>
+     *     The original SqlCall:
+     *
+     *     SqlBasicCall: SESSION
+     *     +- SqlBasicCall: SET_SEMANTICS_TABLE
+     *       +- SqlSelect: "SELECT `my_table`.`a`, `my_table`.`b`, 
`my_table`.`c`, ... FROM ..."
+     *       +- SqlNodeList: (PARTITION KEY)
+     *          +- SqlIdentifier: "b"
+     *          +- SqlIdentifier: "a"
+     *       +- SqlNodeList: (ORDER KEY)
+     *     +- SqlBasicCall: DESCRIPTOR(`rowtime`)
+     *       +- SqlIdentifier: "rowtime"
+     *     +- SqlInternalLiteral: INTERVAL '5' MINUTE
+     * </pre>
+     *
+     * <pre>
+     *     After the process in Calcite:
+     *
+     *     RexCall: SESSION
+     *     +- RexCall: DESCRIPTOR(`rowtime`)
+     *       +- RexFieldAccess: `rowtime`
+     *     +- RexLiteral: 300000:INTERVAL MINUTE
+     * </pre>
+     *
+     * <pre>
+     *     After we modified:
+     *
+     *     RexSetSemanticsTableCall: SESSION
+     *     +- PartitionKeys: [0, 1]
+     *     +- OrderKeys: []
+     *     +- RexCall: DESCRIPTOR(`rowtime`)
+     *       +- RexFieldAccess: `rowtime`
+     *     +- RexLiteral: 300000:INTERVAL MINUTE
+     * </pre>
+     */
+    private RexNode convertSetSemanticsWindowTableFunction(SqlRexContext cx, 
final SqlCall call) {
+        checkState(
+                call.getOperator() instanceof SqlSessionTableFunction,
+                "Currently only support SESSION table function in Set 
Semantics PTF.");
+        SqlSessionTableFunction fun = (SqlSessionTableFunction) 
call.getOperator();
+
+        List<SqlNode> operands = call.getOperandList();
+
+        SqlBasicCall setSemanticsPTFCall = (SqlBasicCall) operands.get(0);
+        SqlNodeList partitionKeys = setSemanticsPTFCall.operand(1);
+        SqlNodeList orderKeys = setSemanticsPTFCall.operand(2);
+        //        RexNode subQuery = cx.convertExpression(setSemanticsPTFCall);
+        checkState(orderKeys.isEmpty(), "SESSION table function does not 
support order keys.");
+        RexCall resolvedCall =
+                (RexCall) 
StandardConvertletTable.INSTANCE.convertWindowFunction(cx, fun, call);
+        ImmutableBitSet partitionKeyRefs = getPartitionKeyIndices(cx, 
partitionKeys);
+
+        // attach the partition keys and order keys on the custom rex call
+        resolvedCall =
+                new RexSetSemanticsTableCall(
+                        resolvedCall.getType(),
+                        resolvedCall.getOperator(),
+                        resolvedCall.getOperands(),
+                        partitionKeyRefs,
+                        ImmutableBitSet.builder().build());
+        return resolvedCall;
+    }
+
+    private ImmutableBitSet getPartitionKeyIndices(SqlRexContext cx, 
SqlNodeList partitions) {
+        final ImmutableBitSet.Builder result = ImmutableBitSet.builder();
+
+        for (SqlNode partition : partitions) {
+            RexNode expr = cx.convertExpression(partition);
+            result.set(parseFieldIdx(expr));
+        }
+        return result.build();
+    }
+
+    private static int parseFieldIdx(RexNode e) {
+        switch (e.getKind()) {
+            case FIELD_ACCESS:

Review Comment:
   I was wondering in which condition we might get a `RexFieldAccess`?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java:
##########
@@ -19,23 +19,36 @@
 package org.apache.flink.table.planner.calcite;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.planner.functions.sql.SqlSessionTableFunction;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexFieldAccess;
+import org.apache.calcite.rex.RexInputRef;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlDataTypeSpec;
 import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.SqlWindowTableFunction;
 import org.apache.calcite.sql.validate.SqlValidatorImpl;
 import org.apache.calcite.sql2rel.SqlRexContext;
 import org.apache.calcite.sql2rel.SqlRexConvertlet;
 import org.apache.calcite.sql2rel.SqlRexConvertletTable;
 import org.apache.calcite.sql2rel.StandardConvertletTable;
+import org.apache.calcite.util.ImmutableBitSet;
 
 import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkState;

Review Comment:
   ```suggestion
   import static org.apache.flink.util.Preconditions.checkArgument;
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/ProjectWindowTableFunctionTransposeRule.java:
##########
@@ -76,15 +78,22 @@ public void onMatch(RelOptRuleCall call) {
         LogicalTableFunctionScan scan = call.rel(1);
         RelNode scanInput = scan.getInput(0);
         TimeAttributeWindowingStrategy windowingStrategy =
-                WindowUtil.convertToWindowingStrategy(
-                        (RexCall) scan.getCall(), scanInput.getRowType());
+                WindowUtil.convertToWindowingStrategy((RexCall) 
scan.getCall(), scanInput);
         // 1. get fields to push down
         ImmutableBitSet projectFields = 
RelOptUtil.InputFinder.bits(project.getProjects(), null);
         int scanInputFieldCount = scanInput.getRowType().getFieldCount();
         ImmutableBitSet toPushFields =
                 ImmutableBitSet.range(0, scanInputFieldCount)
                         .intersect(projectFields)
                         .set(windowingStrategy.getTimeAttributeIndex());
+        // partition keys in session window tvf also need be pushed down
+        if (windowingStrategy.getWindow() instanceof SessionWindowSpec) {
+            SessionWindowSpec sessionWindowSpec = (SessionWindowSpec) 
windowingStrategy.getWindow();
+            int[] partitionKeyIndices = 
sessionWindowSpec.getPartitionKeyIndices();
+            for (int partitionKeyIndex : partitionKeyIndices) {
+                toPushFields = toPushFields.set(partitionKeyIndex);
+            }

Review Comment:
   ```suggestion
               toPushFields = 
toPushFields.union(ImmutableBitSet.of(partitionKeyIndices));
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java:
##########
@@ -160,6 +170,10 @@ public StreamExecWindowAggregate(
     @Override
     protected Transformation<RowData> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config) {
+        if (shouldFallbackToGroupWindowAgg(windowing.getWindow())) {
+            return fallbackToGroupWindowAggregate(planner, config);
+        }
+

Review Comment:
   I suggest inlining the condition check since it's straightforward, and 
meanwhile, updating the method name to emphasize that it's falling back to the 
legacy impl
   ```suggestion
          // TODO Currently, the operator of WindowAggregate does not support 
Session Window, and it
           // needs to fall back to the legacy GroupWindowAggregate. See more 
at FLINK-34048.
           if (windowing.getWindow() instanceof SessionWindowSpec) {
               return fallbackToLegacyGroupWindowAggregate(planner, config);
           }
   
   ```



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala:
##########
@@ -92,13 +93,57 @@ class FlinkRelMdWindowProperties private extends 
MetadataHandler[FlinkMetadata.W
       return null
     }
 
+    val windowSpec = childProps.getWindowSpec
+    // if the window a
+    if (!validateWindowProperties(windowSpec, mapInToOutPos)) {

Review Comment:
   Nit: `validWindowProperties`. IMO, methods like `validateXXX` is usually 
with `void` as the return type. However, if the status is required, then the 
method name should be like `isValidXXX` or `validXXX`.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##########
@@ -392,4 +433,54 @@ object WindowUtil {
     }
     false
   }
+
+  /**
+   * This method only checks the window that contains partition keys like 
session window. The

Review Comment:
   like? I think you might want to use the term "for" or "within"?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java:
##########
@@ -88,4 +106,106 @@ private RexNode convertTryCast(SqlRexContext cx, final 
SqlCall call) {
                 .makeCall(
                         type, FlinkSqlOperatorTable.TRY_CAST, 
Collections.singletonList(valueRex));
     }
+
+    private boolean isSetSemanticsWindowTableFunction(SqlCall call) {
+        if (!(call.getOperator() instanceof SqlWindowTableFunction)) {
+            return false;
+        }
+        List<SqlNode> operands = call.getOperandList();
+        return !operands.isEmpty() && operands.get(0).getKind() == 
SqlKind.SET_SEMANTICS_TABLE;
+    }
+
+    /**
+     * The partition keys and order keys in ptf will be extracted to {@link
+     * RexSetSemanticsTableCall}.
+     *
+     * <p>Take "SESSION(TABLE my_table PARTITION BY (b, a), 
DESCRIPTOR(rowtime), INTERVAL '10'
+     * MINUTE)" as an example.
+     *
+     * <pre>
+     *     The original SqlCall:
+     *
+     *     SqlBasicCall: SESSION
+     *     +- SqlBasicCall: SET_SEMANTICS_TABLE
+     *       +- SqlSelect: "SELECT `my_table`.`a`, `my_table`.`b`, 
`my_table`.`c`, ... FROM ..."
+     *       +- SqlNodeList: (PARTITION KEY)
+     *          +- SqlIdentifier: "b"
+     *          +- SqlIdentifier: "a"
+     *       +- SqlNodeList: (ORDER KEY)
+     *     +- SqlBasicCall: DESCRIPTOR(`rowtime`)
+     *       +- SqlIdentifier: "rowtime"
+     *     +- SqlInternalLiteral: INTERVAL '5' MINUTE
+     * </pre>
+     *
+     * <pre>
+     *     After the process in Calcite:
+     *
+     *     RexCall: SESSION
+     *     +- RexCall: DESCRIPTOR(`rowtime`)
+     *       +- RexFieldAccess: `rowtime`
+     *     +- RexLiteral: 300000:INTERVAL MINUTE
+     * </pre>
+     *
+     * <pre>
+     *     After we modified:
+     *
+     *     RexSetSemanticsTableCall: SESSION
+     *     +- PartitionKeys: [0, 1]

Review Comment:
   I think the correct order should be [1, 0]. 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java:
##########
@@ -88,4 +106,106 @@ private RexNode convertTryCast(SqlRexContext cx, final 
SqlCall call) {
                 .makeCall(
                         type, FlinkSqlOperatorTable.TRY_CAST, 
Collections.singletonList(valueRex));
     }
+
+    private boolean isSetSemanticsWindowTableFunction(SqlCall call) {
+        if (!(call.getOperator() instanceof SqlWindowTableFunction)) {
+            return false;
+        }
+        List<SqlNode> operands = call.getOperandList();
+        return !operands.isEmpty() && operands.get(0).getKind() == 
SqlKind.SET_SEMANTICS_TABLE;
+    }
+
+    /**
+     * The partition keys and order keys in ptf will be extracted to {@link
+     * RexSetSemanticsTableCall}.
+     *
+     * <p>Take "SESSION(TABLE my_table PARTITION BY (b, a), 
DESCRIPTOR(rowtime), INTERVAL '10'
+     * MINUTE)" as an example.
+     *
+     * <pre>
+     *     The original SqlCall:
+     *
+     *     SqlBasicCall: SESSION
+     *     +- SqlBasicCall: SET_SEMANTICS_TABLE
+     *       +- SqlSelect: "SELECT `my_table`.`a`, `my_table`.`b`, 
`my_table`.`c`, ... FROM ..."
+     *       +- SqlNodeList: (PARTITION KEY)
+     *          +- SqlIdentifier: "b"
+     *          +- SqlIdentifier: "a"
+     *       +- SqlNodeList: (ORDER KEY)
+     *     +- SqlBasicCall: DESCRIPTOR(`rowtime`)
+     *       +- SqlIdentifier: "rowtime"
+     *     +- SqlInternalLiteral: INTERVAL '5' MINUTE
+     * </pre>
+     *
+     * <pre>
+     *     After the process in Calcite:
+     *
+     *     RexCall: SESSION
+     *     +- RexCall: DESCRIPTOR(`rowtime`)
+     *       +- RexFieldAccess: `rowtime`
+     *     +- RexLiteral: 300000:INTERVAL MINUTE
+     * </pre>
+     *
+     * <pre>
+     *     After we modified:
+     *
+     *     RexSetSemanticsTableCall: SESSION
+     *     +- PartitionKeys: [0, 1]
+     *     +- OrderKeys: []
+     *     +- RexCall: DESCRIPTOR(`rowtime`)
+     *       +- RexFieldAccess: `rowtime`
+     *     +- RexLiteral: 300000:INTERVAL MINUTE
+     * </pre>
+     */
+    private RexNode convertSetSemanticsWindowTableFunction(SqlRexContext cx, 
final SqlCall call) {
+        checkState(
+                call.getOperator() instanceof SqlSessionTableFunction,
+                "Currently only support SESSION table function in Set 
Semantics PTF.");
+        SqlSessionTableFunction fun = (SqlSessionTableFunction) 
call.getOperator();
+

Review Comment:
   ```suggestion
           checkArgument(
                   call.getOperator() instanceof SqlSessionTableFunction,
                   "Currently, only the SESSION table function is supported in 
Set Semantics PTF.");
           SqlSessionTableFunction fun = (SqlSessionTableFunction) 
call.getOperator();
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java:
##########
@@ -5335,6 +5364,11 @@ public RexNode convertExpression(SqlNode expr) {
                         root = convertQueryRecursive(query, false, null);
                         return RexSubQuery.multiset(root.rel);
 
+                        //                    case SET_SEMANTICS_TABLE:
+                        //                        call = (SqlCall) expr;
+                        //                        query = call.operand(0);
+                        //                        root = 
convertQueryRecursive(query, false, null);
+                        //                        return 
RexSubQuery.setSemanticsTable(root.rel);

Review Comment:
   Remove this. Nit: compared with the master branch to check diff, rather than 
the base commit.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##########
@@ -234,11 +235,45 @@ object WindowUtil {
         val step = getOperandAsLong(windowCall.operands(1))
         val maxSize = getOperandAsLong(windowCall.operands(2))
         new CumulativeWindowSpec(Duration.ofMillis(maxSize), 
Duration.ofMillis(step), offset)
+      case FlinkSqlOperatorTable.SESSION =>
+        windowCall match {
+          // with syntax partition key
+          case setSemanticsTableCall: RexSetSemanticsTableCall =>
+            if (!setSemanticsTableCall.getOrderKeys.isEmpty) {
+              throw new ValidationException("Session window TVF doesn't 
support order by clause.")
+            }
+            val gap = getOperandAsLong(windowCall.operands(1))
+            new SessionWindowSpec(
+              Duration.ofMillis(gap),
+              
windowCall.asInstanceOf[RexSetSemanticsTableCall].getPartitionKeys.toArray)
+          // without syntax partition key

Review Comment:
   I'm a little bit confused. IIUC, no matter w/o the `partition by` clause, 
the query is always translated to `RexSetSemanticsTableCall`. Why differentiate 
them here?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RexSetSemanticsTableCall.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.calcite;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A special {@link RexCall} that is used to represent a table function with 
set semantics. See more
+ * details in {@link 
FlinkConvertletTable#convertSetSemanticsWindowTableFunction}.
+ */
+public class RexSetSemanticsTableCall extends RexCall {
+
+    private final ImmutableBitSet partitionKeys;
+
+    private final ImmutableBitSet orderKeys;
+
+    public RexSetSemanticsTableCall(
+            RelDataType type,
+            SqlOperator operator,
+            List<? extends RexNode> operands,
+            ImmutableBitSet partitionKeys,
+            ImmutableBitSet orderKeys) {
+        super(type, operator, operands);
+        this.partitionKeys = partitionKeys;
+        this.orderKeys = orderKeys;
+    }
+
+    public ImmutableBitSet getPartitionKeys() {
+        return partitionKeys;
+    }
+
+    public ImmutableBitSet getOrderKeys() {
+        return orderKeys;
+    }
+
+    @Override
+    protected String computeDigest(boolean withType) {

Review Comment:
   How about
   ```java
       @Override
       protected String computeDigest(boolean withType) {
           if ((operands.isEmpty()) && (op.getSyntax() == 
SqlSyntax.FUNCTION_ID)) {
               return super.computeDigest(withType);
           }
           final StringBuilder sb = new StringBuilder(op.getName());
           sb.append("(");
           appendKeys(partitionKeys, "PARTITION BY", sb);
           appendKeys(orderKeys, "ORDER BY", sb);
           appendOperands(sb);
           sb.append(")");
           if (withType) {
               sb.append(":");
   
               // NOTE jvs 16-Jan-2005:  for digests, it is very important
               // to use the full type string.
               sb.append(type.getFullTypeString());
           }
           return sb.toString();
       }
   
       private void appendKeys(ImmutableBitSet keys, String prefix, 
StringBuilder sb) {
           sb.append(
                   keys.asList().stream()
                           .map(key -> "$" + key)
                           .collect(Collectors.joining(",", prefix + "(", "), 
")));
       }
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java:
##########
@@ -88,4 +106,106 @@ private RexNode convertTryCast(SqlRexContext cx, final 
SqlCall call) {
                 .makeCall(
                         type, FlinkSqlOperatorTable.TRY_CAST, 
Collections.singletonList(valueRex));
     }
+
+    private boolean isSetSemanticsWindowTableFunction(SqlCall call) {
+        if (!(call.getOperator() instanceof SqlWindowTableFunction)) {
+            return false;
+        }
+        List<SqlNode> operands = call.getOperandList();
+        return !operands.isEmpty() && operands.get(0).getKind() == 
SqlKind.SET_SEMANTICS_TABLE;
+    }
+
+    /**
+     * The partition keys and order keys in ptf will be extracted to {@link
+     * RexSetSemanticsTableCall}.
+     *
+     * <p>Take "SESSION(TABLE my_table PARTITION BY (b, a), 
DESCRIPTOR(rowtime), INTERVAL '10'
+     * MINUTE)" as an example.
+     *
+     * <pre>
+     *     The original SqlCall:
+     *
+     *     SqlBasicCall: SESSION
+     *     +- SqlBasicCall: SET_SEMANTICS_TABLE
+     *       +- SqlSelect: "SELECT `my_table`.`a`, `my_table`.`b`, 
`my_table`.`c`, ... FROM ..."
+     *       +- SqlNodeList: (PARTITION KEY)
+     *          +- SqlIdentifier: "b"
+     *          +- SqlIdentifier: "a"
+     *       +- SqlNodeList: (ORDER KEY)
+     *     +- SqlBasicCall: DESCRIPTOR(`rowtime`)
+     *       +- SqlIdentifier: "rowtime"
+     *     +- SqlInternalLiteral: INTERVAL '5' MINUTE

Review Comment:
   ```suggestion
        * Due to CALCITE-6204, we need to manually extract partition keys and 
order keys and convert
        * them to {@link RexSetSemanticsTableCall}.
        *
        * <p>Take `SESSION(TABLE my_table PARTITION BY (b, a), 
DESCRIPTOR(rowtime), INTERVAL '10'
        * MINUTE)` as an example.
        *
        * <p>The original SqlNode tree after syntax parse looks like
        *
        * <pre>
        * SqlBasicCall: SESSION
        * ├─ SqlBasicCall: SET_SEMANTICS_TABLE
        * │  ├─ SqlSelect: "SELECT ... FROM ..."
        * │  ├─ SqlNodeList: (PARTITION KEY)
        * │  │  ├─ SqlIdentifier: "b"
        * │  │  └─ SqlIdentifier: "a"
        * │  └─ SqlNodeList: (ORDER KEY)
        * ├─ SqlBasicCall: DESCRIPTOR(`rowtime`)
        * │  └─ SqlIdentifier: "rowtime"
        * └─ SqlInternalLiteral: INTERVAL '5' MINUTE
        * </pre>
        *
        * <p>Calcite will skip the first operand of SESSION operator, which 
leads to the following
        * wrong rex call
        *
        * <pre>
        * RexCall: SESSION
        * ├─ RexCall: DESCRIPTOR(`rowtime`)
        * │  └─ RexFieldAccess: `rowtime`
        * └─ RexLiteral: 300000:INTERVAL MINUTE
        * </pre>
        *
        * <p>As a workaround, we flatten the inner sql call and convert it to a 
customized {@link
        * RexSetSemanticsTableCall} to preserve partition keys and order keys
        *
        * <pre>
        * RexSetSemanticsTableCall: SESSION
        * ├─ PartitionKeys: [1, 0]
        * ├─ OrderKeys: []
        * ├─ RexCall: DESCRIPTOR(`rowtime`)
        * │  └─ RexFieldAccess: `rowtime`
        * └─ RexLiteral: 300000:INTERVAL MINUTE
        * </pre>
   ```



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala:
##########
@@ -92,13 +93,57 @@ class FlinkRelMdWindowProperties private extends 
MetadataHandler[FlinkMetadata.W
       return null
     }
 
+    val windowSpec = childProps.getWindowSpec
+    // if the window a
+    if (!validateWindowProperties(windowSpec, mapInToOutPos)) {
+      // If the window becomes illegal after passing through calc or project,
+      // return no window properties
+      return null
+    }
+
     childProps.copy(
       transformColumnIndex(childProps.getWindowStartColumns, mapInToOutPos),
       transformColumnIndex(childProps.getWindowEndColumns, mapInToOutPos),
-      transformColumnIndex(childProps.getWindowTimeColumns, mapInToOutPos)
+      transformColumnIndex(childProps.getWindowTimeColumns, mapInToOutPos),
+      updateWindowSpec(windowSpec, mapInToOutPos)
     )
   }
 
+  /**
+   * Validate the window properties when passing through calc or project.
+   *
+   * This method only checks the window that contains partition keys like 
session window. See more

Review Comment:
   like => for?



##########
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java:
##########
@@ -1432,9 +1431,17 @@ private void substituteSubQuery(Blackboard bb, SubQuery 
subQuery) {
                 bb.cursors.add(converted.r);
                 return;
             case SET_SEMANTICS_TABLE:
-                if (!config.isExpand()) {
-                    return;
-                }
+                // ----- FLINK MODIFICATION BEGIN -----
+                // For two reasons, we always expand SET SEMANTICS TABLE
+                // 1. Calcite has a bug when not expanding the SET SEMANTICS 
TABLE. See more in
+                // CALCITE-6204.
+                // 2. Currently, Flink’s built-in Session Window TVF is the 
only SET SEMANTICS
+                // TABLE. We will expand it by default like other built-in 
window TVFs to reuse some
+                // subsequent processing logic and optimization logic.
+                // if (!config.isExpand()) {
+                //     return;
+                // }

Review Comment:
   ```suggestion
                   // We always expand the SET SEMANTICS TABLE for two reasons:
                   // 1. Calcite has a bug when not expanding the SET SEMANTICS 
TABLE. For more
                   // information, see CALCITE-6204.
                   // 2. Currently, Flink’s built-in Session Window TVF is the 
only PTF with SET
                   // SEMANTICS. We will expand it by default, like other 
built-in window TVFs, to
                   // reuse some subsequent processing and optimization logic.
                   // if (!config.isExpand()) {
                   //     return;
                   // }
   ```



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdWindowProperties.scala:
##########
@@ -92,13 +93,57 @@ class FlinkRelMdWindowProperties private extends 
MetadataHandler[FlinkMetadata.W
       return null
     }
 
+    val windowSpec = childProps.getWindowSpec
+    // if the window a
+    if (!validateWindowProperties(windowSpec, mapInToOutPos)) {
+      // If the window becomes illegal after passing through calc or project,
+      // return no window properties
+      return null
+    }
+
     childProps.copy(
       transformColumnIndex(childProps.getWindowStartColumns, mapInToOutPos),
       transformColumnIndex(childProps.getWindowEndColumns, mapInToOutPos),
-      transformColumnIndex(childProps.getWindowTimeColumns, mapInToOutPos)
+      transformColumnIndex(childProps.getWindowTimeColumns, mapInToOutPos),
+      updateWindowSpec(windowSpec, mapInToOutPos)
     )
   }
 
+  /**
+   * Validate the window properties when passing through calc or project.
+   *
+   * This method only checks the window that contains partition keys like 
session window. See more
+   * at [[WindowUtil.validatePartitionKeyIfNecessary]]
+   */
+  private def validateWindowProperties(
+      windowSpec: WindowSpec,
+      mapInToOutPos: JHashMap[Int, JList[Int]]): Boolean = {
+    windowSpec match {
+      case session: SessionWindowSpec =>
+        val oldPartitionKeys = session.getPartitionKeyIndices
+        oldPartitionKeys.forall(oldPartitionKey => 
mapInToOutPos.contains(oldPartitionKey))
+      case _ => true
+    }
+  }
+
+  /** Update partition key indices in session window spec according to the 
projection map. */
+  private def updateWindowSpec(
+      oldWindowSpec: WindowSpec,
+      mapInToOutPos: JHashMap[Int, JList[Int]]): WindowSpec = {
+    oldWindowSpec match {
+      case session: SessionWindowSpec =>
+        val oldPartitionKeys = session.getPartitionKeyIndices
+        val newPartitionKeys = oldPartitionKeys.map(
+          oldPartitionKey => {
+            val newPartitionKey = mapInToOutPos.get(oldPartitionKey)
+            Preconditions.checkState(newPartitionKey.size == 1)

Review Comment:
   Nit: checkArgument?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/PullUpWindowTableFunctionIntoWindowAggregateRule.scala:
##########
@@ -146,6 +152,15 @@ class PullUpWindowTableFunctionIntoWindowAggregateRule
 
     call.transformTo(newWindowAgg)
   }
+
+  private def unwrapRel(rel: RelNode): RelNode = {

Review Comment:
   Nit: add `@tailrec` annotation



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##########
@@ -317,11 +352,17 @@ object WindowUtil {
   /**
    * For rowtime window, return true if the given aggregate grouping contains 
window start and end.
    * For proctime window, we should also check if it exists a neighbour 
windowTableFunctionCall.
+   *
+   * If the window is a session window, we should also check if the partition 
keys are the same as
+   * the group keys. See more at 
[[WindowUtil.validateGroupKeyPartitionKeyIfNecessary()]].
    */
   def isValidWindowAggregate(agg: FlinkLogicalAggregate): Boolean = {
     val fmq = 
FlinkRelMetadataQuery.reuseOrCreate(agg.getCluster.getMetadataQuery)
     val windowProperties = fmq.getRelWindowProperties(agg.getInput)
     val grouping = agg.getGroupSet
+    if (!validateGroupKeyPartitionKeyIfNecessary(grouping, windowProperties)) {

Review Comment:
   `validGroupKeyPartitionKeyIfNecessary`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to