twalthr commented on code in PR #25753:
URL: https://github.com/apache/flink/pull/25753#discussion_r2007913786


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java:
##########
@@ -183,15 +190,15 @@ protected Transformation<RowData> translateToPlanInternal(
 
         final int orderKey = orderKeys[0];
         final LogicalType orderKeyType = 
inputRowType.getFields().get(orderKey).getType();
-        // check time field && identify window rowtime attribute
-        final int rowTimeIdx;
+        // check time field && identify window time attribute
+        TimeAttribute timeAttribute;
         if (isRowtimeAttribute(orderKeyType)) {
-            rowTimeIdx = orderKey;
+            timeAttribute = TimeAttribute.ROW_TIME;
         } else if (isProctimeAttribute(orderKeyType)) {
-            rowTimeIdx = -1;
+            timeAttribute = TimeAttribute.PROC_TIME;
         } else {
-            throw new TableException(
-                    "OVER windows' ordering in stream mode must be defined on 
a time attribute.");
+            timeAttribute = TimeAttribute.NON_TIME;
+            LOG.info("Non-time attribute window detected");

Review Comment:
   It's uncommon that we log this tiny properties within the planner. I 
recommend to drop it.



##########
docs/content/docs/dev/table/sql/queries/over-agg.md:
##########
@@ -70,11 +70,20 @@ There are two options to define the range, `ROWS` intervals 
and `RANGE` interval
 
 #### RANGE intervals
 
-A `RANGE` interval is defined on the values of the ORDER BY column, which is 
in case of Flink always a time attribute. The following RANGE interval defines 
that all rows with a time attribute of at most 30 minutes less than the current 
row are included in the aggregate.
+A `RANGE` interval is defined on the values of the ORDER BY column, which is 
in case of Flink is either a time or non-time attribute.
+
+The following RANGE interval defines that all rows with a time attribute of at 
most 30 minutes less than the current row are included in the aggregate.
 
 ```sql
 RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW
 ```
+The following RANGE interval defines that all rows with a non-time attribute 
of unbounded rows preceding the current row are included in the aggregate.
+```sql
+RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+```
+

Review Comment:
   remove empty lines



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverWindowRestoreTest.java:
##########
@@ -36,7 +36,7 @@ public OverWindowRestoreTest() {
     @Override
     protected Stream<String> getSavepointPaths(
             TableTestProgram program, ExecNodeMetadata metadata) {
-        if (metadata.version() == 1) {
+        if (metadata.version() == 1 && 
program.equals(OverWindowTestPrograms.LAG_OVER_FUNCTION)) {

Review Comment:
   can you add a comment for this change? just looking at code this seems odd



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java:
##########
@@ -309,73 +320,145 @@ private KeyedProcessFunction<RowData, RowData, RowData> 
createUnboundedOverProce
                         aggInputRowType,
                         JavaScalaConversionUtil.toScala(aggCalls),
                         new boolean[aggCalls.size()],
-                        false, // needRetraction
+                        false, // needInputCount
                         true, // isStateBackendDataViews
                         true); // needDistinctInfo
 
         LogicalType[] fieldTypes = inputRowType.getChildren().toArray(new 
LogicalType[0]);
-        AggsHandlerCodeGenerator generator =
+
+        AggsHandlerCodeGenerator aggsGenerator =
                 new AggsHandlerCodeGenerator(
                         ctx,
                         relBuilder,
                         
JavaScalaConversionUtil.toScala(Arrays.asList(fieldTypes)),
                         false); // copyInputField
 
-        GeneratedAggsHandleFunction genAggsHandler =
-                generator
+        aggsGenerator =
+                aggsGenerator
                         .needAccumulate()
                         // over agg code gen must pass the constants
-                        
.withConstants(JavaScalaConversionUtil.toScala(constants))
-                        .generateAggsHandler("UnboundedOverAggregateHelper", 
aggInfoList);
+                        
.withConstants(JavaScalaConversionUtil.toScala(constants));
+
+        GeneratedAggsHandleFunction genAggsHandler =
+                
aggsGenerator.generateAggsHandler("UnboundedOverAggregateHelper", aggInfoList);
 
         LogicalType[] flattenAccTypes =
                 Arrays.stream(aggInfoList.getAccTypes())
                         
.map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType)
                         .toArray(LogicalType[]::new);
 
-        if (rowTimeIdx >= 0) {
-            switch (unboundedOverVersion) {
-                // Currently there is no migration path between first and 
second versions.
-                case AbstractRowTimeUnboundedPrecedingOver.FIRST_OVER_VERSION:
-                    if (isRowsClause) {
-                        // ROWS unbounded over process function
-                        return new RowTimeRowsUnboundedPrecedingFunction<>(
+        switch (timeAttribute) {
+            case ROW_TIME:
+                final int rowTimeIdx = orderKeys[0];
+                switch (unboundedOverVersion) {
+                    // Currently there is no migration path between first and 
second versions.
+                    case 
AbstractRowTimeUnboundedPrecedingOver.FIRST_OVER_VERSION:
+                        if (isRowsClause) {
+                            // ROWS unbounded over process function
+                            return new RowTimeRowsUnboundedPrecedingFunction<>(
+                                    config.getStateRetentionTime(),
+                                    
TableConfigUtils.getMaxIdleStateRetentionTime(config),
+                                    genAggsHandler,
+                                    flattenAccTypes,
+                                    fieldTypes,
+                                    rowTimeIdx);
+                        } else {
+                            // RANGE unbounded over process function
+                            return new 
RowTimeRangeUnboundedPrecedingFunction<>(
+                                    config.getStateRetentionTime(),
+                                    
TableConfigUtils.getMaxIdleStateRetentionTime(config),
+                                    genAggsHandler,
+                                    flattenAccTypes,
+                                    fieldTypes,
+                                    rowTimeIdx);
+                        }
+                    case 
RowTimeUnboundedPrecedingOverFunctionV2.SECOND_OVER_VERSION:
+                        return new RowTimeUnboundedPrecedingOverFunctionV2<>(
+                                isRowsClause,
                                 config.getStateRetentionTime(),
                                 
TableConfigUtils.getMaxIdleStateRetentionTime(config),
                                 genAggsHandler,
                                 flattenAccTypes,
                                 fieldTypes,
                                 rowTimeIdx);
-                    } else {
-                        // RANGE unbounded over process function
-                        return new RowTimeRangeUnboundedPrecedingFunction<>(
-                                config.getStateRetentionTime(),
-                                
TableConfigUtils.getMaxIdleStateRetentionTime(config),
-                                genAggsHandler,
-                                flattenAccTypes,
-                                fieldTypes,
-                                rowTimeIdx);
-                    }
-                case 
RowTimeUnboundedPrecedingOverFunctionV2.SECOND_OVER_VERSION:
-                    return new RowTimeUnboundedPrecedingOverFunctionV2<>(
-                            isRowsClause,
-                            config.getStateRetentionTime(),
-                            
TableConfigUtils.getMaxIdleStateRetentionTime(config),
-                            genAggsHandler,
-                            flattenAccTypes,
-                            fieldTypes,
-                            rowTimeIdx);
-                default:
-                    throw new UnsupportedOperationException(
-                            "Unsupported unbounded over version: "
-                                    + unboundedOverVersion
-                                    + ". Valid versions are 1 and 2.");
-            }
-        } else {
-            return new ProcTimeUnboundedPrecedingFunction<>(
-                    
StateConfigUtil.createTtlConfig(config.getStateRetentionTime()),
-                    genAggsHandler,
-                    flattenAccTypes);
+                    default:
+                        throw new UnsupportedOperationException(
+                                "Unsupported unbounded over version: "
+                                        + unboundedOverVersion
+                                        + ". Valid versions are 1 and 2.");
+                }
+            case PROC_TIME:
+                return new ProcTimeUnboundedPrecedingFunction<>(
+                        
StateConfigUtil.createTtlConfig(config.getStateRetentionTime()),
+                        genAggsHandler,
+                        flattenAccTypes);
+            case NON_TIME:
+                if (isRowsClause) {
+                    // Non-Time Rows Unbounded Preceding Function
+                    throw new TableException(
+                            "Non-Time Rows Unbounded Preceding Function not 
supported yet.");

Review Comment:
   ```suggestion
                               "OVER windows with UNBOUNDED PRECEDING are not 
supported when sorting on a non-time attribute column.");
   ```



##########
docs/content/docs/dev/table/sql/queries/over-agg.md:
##########
@@ -56,7 +56,7 @@ You can define multiple `OVER` window aggregates in a 
`SELECT` clause. However,
 
 ### ORDER BY
 
-`OVER` windows are defined on an ordered sequence of rows. Since tables do not 
have an inherent order, the `ORDER BY` clause is mandatory. For streaming 
queries, Flink currently only supports `OVER` windows that are defined with an 
ascending [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" 
>}}) order. Additional orderings are not supported.
+`OVER` windows are defined on an ordered sequence of rows. Since tables do not 
have an inherent order, the `ORDER BY` clause is mandatory. For streaming 
queries, Flink currently supports `OVER` windows that are defined with an 
ascending [time attributes]({{< ref "docs/dev/table/concepts/time_attributes" 
>}}) order or ascending non-time attributes. Additional orderings are not 
supported.

Review Comment:
   ```suggestion
   `OVER` windows are defined on an ordered sequence of rows. Since tables do 
not have an inherent order, the `ORDER BY` clause is mandatory. For streaming 
queries, Flink currently supports `OVER` windows that are defined with an 
ascending [time attribute]({{< ref "docs/dev/table/concepts/time_attributes" 
>}}) or ascending non-time attribute. Additional orderings are not supported.
   ```



##########
docs/content/docs/dev/table/sql/queries/over-agg.md:
##########
@@ -70,11 +70,20 @@ There are two options to define the range, `ROWS` intervals 
and `RANGE` interval
 
 #### RANGE intervals
 
-A `RANGE` interval is defined on the values of the ORDER BY column, which is 
in case of Flink always a time attribute. The following RANGE interval defines 
that all rows with a time attribute of at most 30 minutes less than the current 
row are included in the aggregate.
+A `RANGE` interval is defined on the values of the ORDER BY column, which is 
in case of Flink is either a time or non-time attribute.

Review Comment:
   ```suggestion
   A `RANGE` interval is defined on the values of the ORDER BY column, which 
(in case of Flink) is either a time or non-time attribute.
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecOverAggregate.java:
##########
@@ -439,33 +524,44 @@ private KeyedProcessFunction<RowData, RowData, RowData> 
createBoundedOverProcess
                         
.map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType)
                         .toArray(LogicalType[]::new);
 
-        if (rowTimeIdx >= 0) {
-            if (isRowsClause) {
-                return new RowTimeRowsBoundedPrecedingFunction<>(
-                        config.getStateRetentionTime(),
-                        TableConfigUtils.getMaxIdleStateRetentionTime(config),
-                        genAggsHandler,
-                        flattenAccTypes,
-                        fieldTypes,
-                        precedingOffset,
-                        rowTimeIdx);
-            } else {
-                return new RowTimeRangeBoundedPrecedingFunction<>(
-                        genAggsHandler, flattenAccTypes, fieldTypes, 
precedingOffset, rowTimeIdx);
-            }
-        } else {
-            if (isRowsClause) {
-                return new ProcTimeRowsBoundedPrecedingFunction<>(
-                        config.getStateRetentionTime(),
-                        TableConfigUtils.getMaxIdleStateRetentionTime(config),
-                        genAggsHandler,
-                        flattenAccTypes,
-                        fieldTypes,
-                        precedingOffset);
-            } else {
-                return new ProcTimeRangeBoundedPrecedingFunction<>(
-                        genAggsHandler, flattenAccTypes, fieldTypes, 
precedingOffset);
-            }
+        switch (timeAttribute) {
+            case ROW_TIME:
+                final int rowTimeIdx = orderKeys[0];
+                if (isRowsClause) {
+                    return new RowTimeRowsBoundedPrecedingFunction<>(
+                            config.getStateRetentionTime(),
+                            
TableConfigUtils.getMaxIdleStateRetentionTime(config),
+                            genAggsHandler,
+                            flattenAccTypes,
+                            fieldTypes,
+                            precedingOffset,
+                            rowTimeIdx);
+                } else {
+                    return new RowTimeRangeBoundedPrecedingFunction<>(
+                            genAggsHandler,
+                            flattenAccTypes,
+                            fieldTypes,
+                            precedingOffset,
+                            rowTimeIdx);
+                }
+            case PROC_TIME:
+                if (isRowsClause) {
+                    return new ProcTimeRowsBoundedPrecedingFunction<>(
+                            config.getStateRetentionTime(),
+                            
TableConfigUtils.getMaxIdleStateRetentionTime(config),
+                            genAggsHandler,
+                            flattenAccTypes,
+                            fieldTypes,
+                            precedingOffset);
+                } else {
+                    return new ProcTimeRangeBoundedPrecedingFunction<>(
+                            genAggsHandler, flattenAccTypes, fieldTypes, 
precedingOffset);
+                }
+            case NON_TIME:
+                throw new TableException(
+                        "Non-time attribute sort is not supported for bounded 
over aggregate");
+            default:
+                throw new TableException("Unsupported bounded operation");

Review Comment:
   ```suggestion
                   throw new TableException("Unsupported bounded operation for 
OVER window.");
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/OverAggregateNDUTest.java:
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import 
org.apache.flink.table.api.config.OptimizerConfigOptions.NonDeterministicUpdateStrategy;
+import org.apache.flink.table.planner.utils.StreamTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Test NDU errors for over aggregate. */
+class OverAggregateNDUTest extends TableTestBase {

Review Comment:
   can we move this into `NonDeterministicUpdateAnalyzerTest`? It might fit 
better than the exec/stream package.



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunction.java:
##########
@@ -0,0 +1,688 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.over;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.OpenContext;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.StateTtlConfig;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore;
+import org.apache.flink.table.runtime.generated.AggsHandleFunction;
+import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
+import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
+import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
+import org.apache.flink.table.runtime.generated.RecordEqualiser;
+import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Collector;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import static 
org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
+
+/** A basic implementation to support non-time range unbounded over aggregate 
with retract mode. */

Review Comment:
   I saw that the methods have good JavaDoc. But could you add a bit more 
JavaDoc for a very high level summary how this operator works. what are the 
base assumptions? what are the IDs stored in state? what was you goal? 
limitations?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalOverAggregate.scala:
##########
@@ -27,9 +29,12 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.Window
+import org.apache.calcite.util.ImmutableBitSet

Review Comment:
   remove unnecessary changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to