xuyangzhong commented on code in PR #23505:
URL: https://github.com/apache/flink/pull/23505#discussion_r1462653012


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkConvertletTable.java:
##########
@@ -88,4 +106,106 @@ private RexNode convertTryCast(SqlRexContext cx, final 
SqlCall call) {
                 .makeCall(
                         type, FlinkSqlOperatorTable.TRY_CAST, 
Collections.singletonList(valueRex));
     }
+
+    private boolean isSetSemanticsWindowTableFunction(SqlCall call) {
+        if (!(call.getOperator() instanceof SqlWindowTableFunction)) {
+            return false;
+        }
+        List<SqlNode> operands = call.getOperandList();
+        return !operands.isEmpty() && operands.get(0).getKind() == 
SqlKind.SET_SEMANTICS_TABLE;
+    }
+
+    /**
+     * The partition keys and order keys in ptf will be extracted to {@link
+     * RexSetSemanticsTableCall}.
+     *
+     * <p>Take "SESSION(TABLE my_table PARTITION BY (b, a), 
DESCRIPTOR(rowtime), INTERVAL '10'
+     * MINUTE)" as an example.
+     *
+     * <pre>
+     *     The original SqlCall:
+     *
+     *     SqlBasicCall: SESSION
+     *     +- SqlBasicCall: SET_SEMANTICS_TABLE
+     *       +- SqlSelect: "SELECT `my_table`.`a`, `my_table`.`b`, 
`my_table`.`c`, ... FROM ..."
+     *       +- SqlNodeList: (PARTITION KEY)
+     *          +- SqlIdentifier: "b"
+     *          +- SqlIdentifier: "a"
+     *       +- SqlNodeList: (ORDER KEY)
+     *     +- SqlBasicCall: DESCRIPTOR(`rowtime`)
+     *       +- SqlIdentifier: "rowtime"
+     *     +- SqlInternalLiteral: INTERVAL '5' MINUTE
+     * </pre>
+     *
+     * <pre>
+     *     After the process in Calcite:
+     *
+     *     RexCall: SESSION
+     *     +- RexCall: DESCRIPTOR(`rowtime`)
+     *       +- RexFieldAccess: `rowtime`
+     *     +- RexLiteral: 300000:INTERVAL MINUTE
+     * </pre>
+     *
+     * <pre>
+     *     After we modified:
+     *
+     *     RexSetSemanticsTableCall: SESSION
+     *     +- PartitionKeys: [0, 1]
+     *     +- OrderKeys: []
+     *     +- RexCall: DESCRIPTOR(`rowtime`)
+     *       +- RexFieldAccess: `rowtime`
+     *     +- RexLiteral: 300000:INTERVAL MINUTE
+     * </pre>
+     */
+    private RexNode convertSetSemanticsWindowTableFunction(SqlRexContext cx, 
final SqlCall call) {
+        checkState(
+                call.getOperator() instanceof SqlSessionTableFunction,
+                "Currently only support SESSION table function in Set 
Semantics PTF.");
+        SqlSessionTableFunction fun = (SqlSessionTableFunction) 
call.getOperator();
+
+        List<SqlNode> operands = call.getOperandList();
+
+        SqlBasicCall setSemanticsPTFCall = (SqlBasicCall) operands.get(0);
+        SqlNodeList partitionKeys = setSemanticsPTFCall.operand(1);
+        SqlNodeList orderKeys = setSemanticsPTFCall.operand(2);
+        //        RexNode subQuery = cx.convertExpression(setSemanticsPTFCall);
+        checkState(orderKeys.isEmpty(), "SESSION table function does not 
support order keys.");
+        RexCall resolvedCall =
+                (RexCall) 
StandardConvertletTable.INSTANCE.convertWindowFunction(cx, fun, call);
+        ImmutableBitSet partitionKeyRefs = getPartitionKeyIndices(cx, 
partitionKeys);
+
+        // attach the partition keys and order keys on the custom rex call
+        resolvedCall =
+                new RexSetSemanticsTableCall(
+                        resolvedCall.getType(),
+                        resolvedCall.getOperator(),
+                        resolvedCall.getOperands(),
+                        partitionKeyRefs,
+                        ImmutableBitSet.builder().build());
+        return resolvedCall;
+    }
+
+    private ImmutableBitSet getPartitionKeyIndices(SqlRexContext cx, 
SqlNodeList partitions) {
+        final ImmutableBitSet.Builder result = ImmutableBitSet.builder();
+
+        for (SqlNode partition : partitions) {
+            RexNode expr = cx.convertExpression(partition);
+            result.set(parseFieldIdx(expr));
+        }
+        return result.build();
+    }
+
+    private static int parseFieldIdx(RexNode e) {
+        switch (e.getKind()) {
+            case FIELD_ACCESS:

Review Comment:
   I read the comment in RexFieldAccess. It is used for correlate. Delete it 
here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to