LadyForest commented on code in PR #23505:
URL: https://github.com/apache/flink/pull/23505#discussion_r1448461452


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##########
@@ -234,11 +238,71 @@ object WindowUtil {
         val step = getOperandAsLong(windowCall.operands(1))
         val maxSize = getOperandAsLong(windowCall.operands(2))
         new CumulativeWindowSpec(Duration.ofMillis(maxSize), 
Duration.ofMillis(step), offset)
+      case FlinkSqlOperatorTable.SESSION =>
+        val gap = getOperandAsLong(windowCall.operands(1))
+        val partitionKeys =
+          exploreSessionWindowPartitionKeys(scanInput)
+        new SessionWindowSpec(Duration.ofMillis(gap), partitionKeys)
     }
 
     new TimeAttributeWindowingStrategy(windowSpec, timeAttributeType, 
timeIndex)
   }
 
+  /**
+   * If the session window tvf has partition keys, the whole tree is like:
+   *
+   * {{{

Review Comment:
   Nit
   ```scala
   {{{
      *          TableFunctionScan
      *                  |
      *          [Project or Calc]
      *                  |
      *               Exchange
      * }}}
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java:
##########
@@ -1432,9 +1433,15 @@ private void substituteSubQuery(Blackboard bb, SubQuery 
subQuery) {
                 bb.cursors.add(converted.r);
                 return;
             case SET_SEMANTICS_TABLE:
-                if (!config.isExpand()) {
-                    return;
-                }
+                // ----- FLINK MODIFICATION BEGIN -----
+                // Currently, Flink will not distinguish tvf between SET 
semantics and ROW
+                // semantics.
+                // And in Flink, only session window tvf need to support SET 
semantics. It will
+                // always be expanded.
+                //  if (!config.isExpand()) {
+                //      return;
+                //  }

Review Comment:
   Can we defer the sub-query rewrite to the physical phase?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java:
##########
@@ -144,7 +145,8 @@ public RelNode visit(RelNode node) {
                 || node instanceof FlinkLogicalSort
                 || node instanceof FlinkLogicalOverAggregate
                 || node instanceof FlinkLogicalExpand
-                || node instanceof FlinkLogicalScriptTransform) {
+                || node instanceof FlinkLogicalScriptTransform
+                || node instanceof FlinkLogicalExchange) {

Review Comment:
   I think we don't need to involve `FlinkLogicalExchange` here if we can defer 
the sub-query rewrite to the physical phase



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java:
##########
@@ -249,4 +263,53 @@ private GeneratedNamespaceAggsHandleFunction<Long> 
createAggsHandler(
                 sliceAssigner,
                 shiftTimeZone);
     }
+
+    /**
+     * Currently, the operator of WindowAggregate does not support Session 
Window and it needs to
+     * fall back to the legacy GroupWindowAggregate.
+     */
+    private boolean shouldFallbackToGroupWindowAgg(WindowSpec windowSpec) {
+        return windowSpec instanceof SessionWindowSpec;
+    }
+
+    private Transformation<RowData> fallbackToGroupWindowAggregate(
+            PlannerBase planner, ExecNodeConfig config) {
+        Preconditions.checkState(windowing.getWindow() instanceof 
SessionWindowSpec);
+
+        if (windowing instanceof TimeAttributeWindowingStrategy) {
+            LogicalType timeAttributeType = windowing.getTimeAttributeType();
+            LogicalWindow logicalWindow =
+                    new SessionGroupWindow(
+                            new WindowReference("w$", timeAttributeType),
+                            new FieldReferenceExpression(
+                                    // mock an empty time field name here
+                                    "",
+                                    
fromLogicalTypeToDataType(timeAttributeType),
+                                    0,
+                                    ((TimeAttributeWindowingStrategy) 
windowing)
+                                            .getTimeAttributeIndex()),
+                            intervalOfMillis(
+                                    ((SessionWindowSpec) windowing.getWindow())
+                                            .getGap()
+                                            .toMillis()));
+
+            StreamExecGroupWindowAggregate groupWindowAggregate =
+                    new StreamExecGroupWindowAggregate(
+                            planner.getTableConfig(),
+                            grouping,
+                            aggCalls,
+                            logicalWindow,
+                            namedWindowProperties,
+                            false,
+                            InputProperty.DEFAULT,
+                            (RowType) getOutputType(),
+                            getDescription());
+
+            groupWindowAggregate.setInputEdges(getInputEdges());
+            return groupWindowAggregate.translateToPlanInternal(planner, 
config);
+        }
+
+        throw new UnsupportedOperationException(
+                "Unsupported windowing strategy: " + windowing.getClass());

Review Comment:
   Nit: `String.format("Unsupported windowing strategy: %s for session 
window.", windowing.getClass)`



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SessionWindowSpec.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.logical;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.TimeUtils.formatWithHighestUnit;
+
+/** Logical representation of a session window specification. */
+@JsonTypeName("SessionWindow")
+public class SessionWindowSpec implements WindowSpec {
+    public static final String FIELD_NAME_GAP = "gap";
+    public static final String FIELD_NAME_PARTITION_KEYS = 
"partition_key_indices";
+
+    @JsonProperty(FIELD_NAME_GAP)
+    private final Duration gap;
+
+    @JsonProperty(FIELD_NAME_PARTITION_KEYS)

Review Comment:
   Is there any reason not to include the `orderBy` list?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/logical/SessionWindowSpec.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.logical;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.TimeUtils.formatWithHighestUnit;
+
+/** Logical representation of a session window specification. */
+@JsonTypeName("SessionWindow")
+public class SessionWindowSpec implements WindowSpec {
+    public static final String FIELD_NAME_GAP = "gap";
+    public static final String FIELD_NAME_PARTITION_KEYS = 
"partition_key_indices";
+
+    @JsonProperty(FIELD_NAME_GAP)
+    private final Duration gap;
+
+    @JsonProperty(FIELD_NAME_PARTITION_KEYS)
+    private final int[] partitionKeyIndices;
+
+    @JsonCreator
+    public SessionWindowSpec(
+            @JsonProperty(FIELD_NAME_GAP) Duration gap,
+            @JsonProperty(FIELD_NAME_PARTITION_KEYS) int[] 
partitionKeyIndices) {
+        this.gap = checkNotNull(gap);
+        this.partitionKeyIndices = checkNotNull(partitionKeyIndices);
+    }
+
+    @Override
+    public String toSummaryString(String windowing, String[] inputFieldNames) {
+        if (partitionKeyIndices.length == 0) {
+            return String.format("SESSION(%s, gap=[%s])", windowing, 
formatWithHighestUnit(gap));
+        } else {
+            String[] partitionKeyNames =
+                    IntStream.of(partitionKeyIndices)
+                            .mapToObj(idx -> inputFieldNames[idx])
+                            .toArray(String[]::new);
+            return String.format(
+                    "SESSION(%s, gap=[%s], partition keys=%s)",
+                    windowing, formatWithHighestUnit(gap), 
Arrays.toString(partitionKeyNames));

Review Comment:
   What about the `orderBy` list?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java:
##########
@@ -77,6 +79,11 @@ protected CommonExecWindowTableFunction(
     @Override
     protected Transformation<RowData> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config) {
+        if (windowingStrategy.getWindow() instanceof SessionWindowSpec) {
+            // TODO introduce session window tvf op instead of falling back to 
group window

Review Comment:
   Correct me if I'm wrong, but the seesion window agg will not hit this branch?
   I think adding a TODO is ok, but there is no need to throw an exception 
here. O.w., it might be misleading.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalExchange.scala:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.logical
+
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions
+
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptCost, 
RelOptPlanner, RelOptRule, RelTraitSet}
+import org.apache.calcite.rel.{RelDistribution, RelNode}
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
+import org.apache.calcite.rel.core.Exchange
+import org.apache.calcite.rel.logical.LogicalExchange
+import org.apache.calcite.rel.metadata.RelMetadataQuery
+import org.apache.calcite.util.Util
+
+/**
+ * Sub-class of [[Exchange]] that is a relational expression which imposes a 
particular distribution

Review Comment:
   We may not need to implement this.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalExchangeRule.scala:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.rules.physical.stream
+
+import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, 
FlinkRelDistributionTraitDef}
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions
+import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalExchange
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalExchange
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.convert.ConverterRule.Config
+
+/** Rule that converts [[FlinkLogicalExchange]] to [[StreamPhysicalExchange]]. 
*/
+class StreamPhysicalExchangeRule(config: Config) extends ConverterRule(config) 
{
+
+  override def convert(rel: RelNode): RelNode = {
+    val exchange = rel.asInstanceOf[FlinkLogicalExchange]
+    val flinkRelDistribution = 
FlinkRelDistribution.convertFrom(exchange.getDistribution)
+    val newTrait =
+      rel.getTraitSet
+        .replace(FlinkConventions.STREAM_PHYSICAL)
+        
.replace(FlinkRelDistributionTraitDef.INSTANCE.canonize(flinkRelDistribution))
+    val newInput = RelOptRule.convert(exchange.getInput, 
FlinkConventions.STREAM_PHYSICAL)
+    new StreamPhysicalExchange(rel.getCluster, newTrait, newInput, 
flinkRelDistribution)
+  }
+}
+
+object StreamPhysicalExchangeRule {

Review Comment:
   If we can defer the sub-query rewrite to the physical phase, we might not 
need to this rule?



##########
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java:
##########
@@ -1432,9 +1433,15 @@ private void substituteSubQuery(Blackboard bb, SubQuery 
subQuery) {
                 bb.cursors.add(converted.r);
                 return;
             case SET_SEMANTICS_TABLE:
-                if (!config.isExpand()) {
-                    return;
-                }
+                // ----- FLINK MODIFICATION BEGIN -----
+                // Currently, Flink will not distinguish tvf between SET 
semantics and ROW
+                // semantics.
+                // And in Flink, only session window tvf need to support SET 
semantics. It will
+                // always be expanded.
+                //  if (!config.isExpand()) {
+                //      return;
+                //  }

Review Comment:
   What if we change the logic in `StandardConvertletTable` as a workaround?
   
   For `convertWindowFunction`, we can keep the unflattened partition list and 
convert it to the rex call.
   
   In `SqlToRelConverter` 
   
   we can extend `convertExpression` to cope with `SET_SEMANTICS_TABLE` kind.
   
   E.g.
   One possible way is to hack the switch logic
   ```java
   case SET_SEMANTICS_TABLE:
             SqlNode partitionList = ((SqlCall)expr).operand(1);
             SqlNode orderByList = ((SqlCall)expr).operand(2);
             // convert it to SqlWindow??
            
   ```
   Another way is to modify `convertExtendedExpression` and directly return the 
converted rex call.



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala:
##########
@@ -234,11 +238,71 @@ object WindowUtil {
         val step = getOperandAsLong(windowCall.operands(1))
         val maxSize = getOperandAsLong(windowCall.operands(2))
         new CumulativeWindowSpec(Duration.ofMillis(maxSize), 
Duration.ofMillis(step), offset)
+      case FlinkSqlOperatorTable.SESSION =>
+        val gap = getOperandAsLong(windowCall.operands(1))
+        val partitionKeys =
+          exploreSessionWindowPartitionKeys(scanInput)
+        new SessionWindowSpec(Duration.ofMillis(gap), partitionKeys)
     }
 
     new TimeAttributeWindowingStrategy(windowSpec, timeAttributeType, 
timeIndex)
   }
 
+  /**
+   * If the session window tvf has partition keys, the whole tree is like:
+   *
+   * {{{
+   *          TableFunctionScan
+   *                  |
+   *          Project / Calc (optional)
+   *                 |
+   *              Exchange
+   * }}}
+   */
+  private def exploreSessionWindowPartitionKeys(scanInput: RelNode): 
Array[Int] = {
+    var input = unwrapHepRelVertexOrRelSubSet(scanInput)
+    // when transpose project or calc, the indices of the partition keys will 
change
+    var indexMapper: TargetMapping = null
+    input = input match {
+      case project: Project =>
+        indexMapper = project.getMapping
+        unwrapHepRelVertexOrRelSubSet(input.getInput(0))
+      case calc: Calc =>
+        val calcProgram = calc.getProgram
+        val projects = calcProgram.getProjectList
+        val inputSize = calcProgram.getInputRowType.getFieldNames.size()
+        indexMapper =
+          Project.getMapping(inputSize, projects.map(p => 
calcProgram.expandLocalRef(p)).toList)
+        unwrapHepRelVertexOrRelSubSet(input.getInput(0))
+      case _ => input
+    }
+
+    input match {
+      case exchange: Exchange =>
+        val partitionKey = exchange.getDistribution.getKeys
+        val originalIndices = 
JavaScalaConversionUtil.toScala(partitionKey).map(_.toInt).toArray
+        if (indexMapper != null) {
+          originalIndices.map(
+            v => {
+              indexMapper.getTarget(v)
+            })
+        } else {
+          originalIndices
+        }
+      case _ =>
+        Array.empty[Int]
+    }
+
+  }
+
+  private def unwrapHepRelVertexOrRelSubSet(node: RelNode): RelNode = {

Review Comment:
   Nit: How about 'unwrapCurrentRel`? Which is more concise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to