dianfu commented on a change in pull request #13504:
URL: https://github.com/apache/flink/pull/13504#discussion_r503671557



##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonBoundedRowsOperator.java
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.functions.CleanupState;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} 
Operator for ROWS clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class AbstractStreamArrowPythonBoundedRowsOperator<K>
+       extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> 
implements CleanupState {
+
+       private static final long serialVersionUID = 1L;
+
+       private final long minRetentionTime;
+
+       private final long maxRetentionTime;
+
+       private final boolean stateCleaningEnabled;
+
+       /**
+        * list to sort timestamps to access rows in timestamp order.
+        */
+       transient LinkedList<Long> sortedTimestamps;
+
+       transient LinkedList<RowData> windowData;
+
+       public AbstractStreamArrowPythonBoundedRowsOperator(
+               Configuration config,
+               long minRetentionTime,
+               long maxRetentionTime,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, pandasAggFunctions, inputType, outputType, 
inputTimeFieldIndex, lowerBoundary,
+                       groupingSet, udafInputOffsets);
+               this.minRetentionTime = minRetentionTime;
+               this.maxRetentionTime = maxRetentionTime;
+               this.stateCleaningEnabled = minRetentionTime > 1;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               sortedTimestamps = new LinkedList<>();
+               windowData = new LinkedList<>();
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               if (stateCleaningEnabled) {
+
+                       Iterator<Long> keysIt = inputState.keys().iterator();
+                       Long lastProcessedTime = lastTriggeringTsState.value();
+                       if (lastProcessedTime == null) {
+                               lastProcessedTime = 0L;
+                       }
+
+                       // is data left which has not been processed yet?
+                       boolean noRecordsToProcess = true;
+                       while (keysIt.hasNext() && noRecordsToProcess) {
+                               if (keysIt.next() > lastProcessedTime) {
+                                       noRecordsToProcess = false;
+                               }
+                       }
+
+                       if (noRecordsToProcess) {
+                               inputState.clear();
+                               cleanupTsState.clear();
+                       } else {
+                               // There are records left to process because a 
watermark has not been received yet.
+                               // This would only happen if the input stream 
has stopped. So we don't need to clean up.
+                               // We leave the state as it is and schedule a 
new cleanup timer
+                               
registerProcessingCleanupTimer(timerService.currentProcessingTime());
+                       }
+               }
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               long timestamp = timer.getTimestamp();
+               // gets all window data from state for the calculation
+               List<RowData> inputs = inputState.get(timestamp);
+
+               Iterable<Long> keyIter = inputState.keys();
+               for (Long dataTs : keyIter) {
+                       insertToSortedList(dataTs);
+               }
+               int index = sortedTimestamps.indexOf(timestamp);
+               for (int i = 0; i < inputs.size(); i++) {
+                       forwardedInputQueue.add(inputs.get(i));
+                       triggerWindowProcess(inputs, i, index);
+               }
+               windowData.clear();
+               sortedTimestamps.clear();
+       }
+
+       @Override
+       @SuppressWarnings("ConstantConditions")
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] udafResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(udafResult, 0, length);
+               int rowCount = arrowSerializer.load();
+               for (int i = 0; i < rowCount; i++) {
+                       RowData data = arrowSerializer.read(i);
+                       RowData key = forwardedInputQueue.poll();
+                       reuseJoinedRow.setRowKind(key.getRowKind());
+                       rowDataWrapper.collect(reuseJoinedRow.replace(key, 
data));
+               }
+       }
+
+       void registerProcessingCleanupTimer(long currentTime) throws Exception {
+               if (stateCleaningEnabled) {
+                       registerProcessingCleanupTimer(
+                               cleanupTsState,
+                               currentTime,
+                               minRetentionTime,
+                               maxRetentionTime,
+                               timerService);
+               }
+       }
+
+       void triggerWindowProcess(List<RowData> inputs, int i, int index) 
throws Exception {
+               int startIndex;
+               int startPos = 0;
+               if (windowData.isEmpty()) {
+                       if (i >= lowerBoundary) {
+                               for (int j = (int) (i - lowerBoundary); j <= i; 
j++) {
+                                       RowData rowData = inputs.get(j);
+                                       windowData.add(rowData);
+                                       
arrowSerializer.write(getFunctionInput(rowData));
+                               }
+                               currentBatchCount += lowerBoundary;
+                       } else {
+                               Long previousTimestamp;
+                               List<RowData> previousData = null;
+                               int length = 0;
+                               startIndex = index - 1;
+                               long remainingDataCount = lowerBoundary - i;
+                               ListIterator<Long> iter = 
sortedTimestamps.listIterator(index);
+                               while (remainingDataCount > 0 && 
iter.hasPrevious()) {
+                                       previousTimestamp = iter.previous();
+                                       previousData = 
inputState.get(previousTimestamp);
+                                       length = previousData.size();
+                                       if (remainingDataCount <= length) {
+                                               startPos = (int) (length - 
remainingDataCount);
+                                               remainingDataCount = 0;
+                                       } else {
+                                               remainingDataCount -= length;
+                                               startIndex--;
+                                       }
+                               }
+                               if (previousData != null) {
+                                       for (int j = startPos; j < length; j++) 
{
+                                               RowData rowData = 
previousData.get(j);
+                                               windowData.add(rowData);
+                                               
arrowSerializer.write(getFunctionInput(rowData));
+                                               currentBatchCount++;
+                                       }
+                                       // clear outdated data.
+                                       ListIterator<Long> clearIter = 
sortedTimestamps.listIterator();
+                                       for (int j = 0; j < startIndex; j++) {
+                                               long outdatedTs = 
clearIter.next();
+                                               inputState.remove(outdatedTs);
+                                       }
+
+                                       startIndex++;
+                                       while (startIndex < index) {
+                                               previousTimestamp = iter.next();
+                                               previousData = 
inputState.get(previousTimestamp);
+                                               for (RowData rowData : 
previousData) {
+                                                       windowData.add(rowData);
+                                                       
arrowSerializer.write(getFunctionInput(rowData));
+                                               }
+                                               currentBatchCount += 
previousData.size();
+                                               startIndex++;
+                                       }
+                               }
+                               for (int j = 0; j <= i; j++) {
+                                       RowData rowData = inputs.get(j);
+                                       windowData.add(rowData);
+                                       
arrowSerializer.write(getFunctionInput(rowData));
+                               }
+                               currentBatchCount += i + 1;
+                       }
+               } else {
+                       if (windowData.size() <= lowerBoundary) {
+                               windowData.add(inputs.get(i));
+                       } else {
+                               windowData.pop();
+                               windowData.add(inputs.get(i));
+                       }
+                       for (RowData rowData : windowData) {
+                               
arrowSerializer.write(getFunctionInput(rowData));
+                       }
+                       currentBatchCount += windowData.size();
+               }
+               if (currentBatchCount > 0) {
+                       arrowSerializer.finishCurrentBatch();
+                       pythonFunctionRunner.process(baos.toByteArray());
+                       elementCount += currentBatchCount;
+                       checkInvokeFinishBundleByCount();
+                       currentBatchCount = 0;
+                       baos.reset();
+               }
+       }
+
+       void insertToSortedList(Long dataTs) {
+               ListIterator<Long> listIterator = 
sortedTimestamps.listIterator(0);

Review comment:
       There is no need to insert the timestamps which are higher than the 
watermark. What do you think?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonBoundedRowsOperator.java
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.functions.CleanupState;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} 
Operator for ROWS clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class AbstractStreamArrowPythonBoundedRowsOperator<K>
+       extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> 
implements CleanupState {
+
+       private static final long serialVersionUID = 1L;
+
+       private final long minRetentionTime;
+
+       private final long maxRetentionTime;
+
+       private final boolean stateCleaningEnabled;
+
+       /**
+        * list to sort timestamps to access rows in timestamp order.
+        */
+       transient LinkedList<Long> sortedTimestamps;
+
+       transient LinkedList<RowData> windowData;
+
+       public AbstractStreamArrowPythonBoundedRowsOperator(
+               Configuration config,
+               long minRetentionTime,
+               long maxRetentionTime,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, pandasAggFunctions, inputType, outputType, 
inputTimeFieldIndex, lowerBoundary,
+                       groupingSet, udafInputOffsets);
+               this.minRetentionTime = minRetentionTime;
+               this.maxRetentionTime = maxRetentionTime;
+               this.stateCleaningEnabled = minRetentionTime > 1;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               sortedTimestamps = new LinkedList<>();
+               windowData = new LinkedList<>();
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               if (stateCleaningEnabled) {
+
+                       Iterator<Long> keysIt = inputState.keys().iterator();
+                       Long lastProcessedTime = lastTriggeringTsState.value();
+                       if (lastProcessedTime == null) {
+                               lastProcessedTime = 0L;
+                       }
+
+                       // is data left which has not been processed yet?
+                       boolean noRecordsToProcess = true;
+                       while (keysIt.hasNext() && noRecordsToProcess) {
+                               if (keysIt.next() > lastProcessedTime) {
+                                       noRecordsToProcess = false;
+                               }
+                       }
+
+                       if (noRecordsToProcess) {
+                               inputState.clear();
+                               cleanupTsState.clear();
+                       } else {
+                               // There are records left to process because a 
watermark has not been received yet.
+                               // This would only happen if the input stream 
has stopped. So we don't need to clean up.
+                               // We leave the state as it is and schedule a 
new cleanup timer
+                               
registerProcessingCleanupTimer(timerService.currentProcessingTime());
+                       }
+               }
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               long timestamp = timer.getTimestamp();
+               // gets all window data from state for the calculation
+               List<RowData> inputs = inputState.get(timestamp);
+
+               Iterable<Long> keyIter = inputState.keys();

Review comment:
       What about extracting the following lines of code into a separate 
method, for example, buildSortedTimestamps? Then we could also use it in 
StreamArrowPythonProcTimeBoundedRowsOperator. 
   ```
                Iterable<Long> keyIter = inputState.keys();
                for (Long dataTs : keyIter) {
                        insertToSortedList(dataTs);
                }
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonBoundedRowsOperator.java
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.functions.CleanupState;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} 
Operator for ROWS clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class AbstractStreamArrowPythonBoundedRowsOperator<K>
+       extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> 
implements CleanupState {
+
+       private static final long serialVersionUID = 1L;
+
+       private final long minRetentionTime;
+
+       private final long maxRetentionTime;
+
+       private final boolean stateCleaningEnabled;
+
+       /**
+        * list to sort timestamps to access rows in timestamp order.
+        */
+       transient LinkedList<Long> sortedTimestamps;
+
+       transient LinkedList<RowData> windowData;
+
+       public AbstractStreamArrowPythonBoundedRowsOperator(
+               Configuration config,
+               long minRetentionTime,
+               long maxRetentionTime,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, pandasAggFunctions, inputType, outputType, 
inputTimeFieldIndex, lowerBoundary,
+                       groupingSet, udafInputOffsets);
+               this.minRetentionTime = minRetentionTime;
+               this.maxRetentionTime = maxRetentionTime;
+               this.stateCleaningEnabled = minRetentionTime > 1;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               sortedTimestamps = new LinkedList<>();
+               windowData = new LinkedList<>();
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               if (stateCleaningEnabled) {
+
+                       Iterator<Long> keysIt = inputState.keys().iterator();
+                       Long lastProcessedTime = lastTriggeringTsState.value();
+                       if (lastProcessedTime == null) {
+                               lastProcessedTime = 0L;
+                       }
+
+                       // is data left which has not been processed yet?
+                       boolean noRecordsToProcess = true;
+                       while (keysIt.hasNext() && noRecordsToProcess) {
+                               if (keysIt.next() > lastProcessedTime) {
+                                       noRecordsToProcess = false;
+                               }
+                       }
+
+                       if (noRecordsToProcess) {
+                               inputState.clear();
+                               cleanupTsState.clear();
+                       } else {
+                               // There are records left to process because a 
watermark has not been received yet.
+                               // This would only happen if the input stream 
has stopped. So we don't need to clean up.
+                               // We leave the state as it is and schedule a 
new cleanup timer
+                               
registerProcessingCleanupTimer(timerService.currentProcessingTime());
+                       }
+               }
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               long timestamp = timer.getTimestamp();
+               // gets all window data from state for the calculation
+               List<RowData> inputs = inputState.get(timestamp);
+
+               Iterable<Long> keyIter = inputState.keys();
+               for (Long dataTs : keyIter) {
+                       insertToSortedList(dataTs);
+               }
+               int index = sortedTimestamps.indexOf(timestamp);
+               for (int i = 0; i < inputs.size(); i++) {
+                       forwardedInputQueue.add(inputs.get(i));
+                       triggerWindowProcess(inputs, i, index);
+               }
+               windowData.clear();
+               sortedTimestamps.clear();
+       }
+
+       @Override
+       @SuppressWarnings("ConstantConditions")
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] udafResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(udafResult, 0, length);
+               int rowCount = arrowSerializer.load();
+               for (int i = 0; i < rowCount; i++) {
+                       RowData data = arrowSerializer.read(i);
+                       RowData key = forwardedInputQueue.poll();
+                       reuseJoinedRow.setRowKind(key.getRowKind());
+                       rowDataWrapper.collect(reuseJoinedRow.replace(key, 
data));
+               }
+       }
+
+       void registerProcessingCleanupTimer(long currentTime) throws Exception {
+               if (stateCleaningEnabled) {
+                       registerProcessingCleanupTimer(
+                               cleanupTsState,
+                               currentTime,
+                               minRetentionTime,
+                               maxRetentionTime,
+                               timerService);
+               }
+       }
+
+       void triggerWindowProcess(List<RowData> inputs, int i, int index) 
throws Exception {
+               int startIndex;
+               int startPos = 0;
+               if (windowData.isEmpty()) {
+                       if (i >= lowerBoundary) {
+                               for (int j = (int) (i - lowerBoundary); j <= i; 
j++) {
+                                       RowData rowData = inputs.get(j);
+                                       windowData.add(rowData);

Review comment:
       What about just building the windowData list if windowData is empty? I 
think we can share the same logic about performing the serialization.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonOverAggregate.scala
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.memory.ManagedMemoryUseCase
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonAggregate
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonOverAggregate.{
+  ARROW_PYTHON_OVER_WINDOW_RANGE_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_RANGE_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_ROWS_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_ROWS_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME
+}
+import org.apache.flink.table.planner.plan.utils.{KeySelectorUtil, 
OverAggregateUtil}
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Window.Group
+import org.apache.calcite.rel.core.{AggregateCall, Window}
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+/**
+  * Stream physical RelNode for python time-based over [[Window]].
+  */
+class StreamExecPythonOverAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    outputRowType: RelDataType,
+    inputRowType: RelDataType,
+    logicWindow: Window)
+  extends StreamExecOverAggregateBase(
+    cluster,
+    traitSet,
+    inputRel,
+    outputRowType,
+    inputRowType,
+    logicWindow)
+  with CommonPythonAggregate {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new StreamExecPythonOverAggregate(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      outputRowType,
+      inputRowType,
+      logicWindow
+      )
+  }
+
+  override protected def translateToPlanInternal(
+      planner: StreamPlanner): Transformation[RowData] = {
+    val tableConfig = planner.getTableConfig
+
+    val overWindow: Group = logicWindow.groups.get(0)
+
+    val orderKeys = overWindow.orderKeys.getFieldCollations
+
+    if (orderKeys.size() != 1) {
+      throw new TableException(
+        "The window can only be ordered by a single time column.")
+    }
+    val orderKey = orderKeys.get(0)
+
+    if (!orderKey.direction.equals(ASCENDING)) {
+      throw new TableException(
+        "The window can only be ordered in ASCENDING mode.")
+    }
+
+    val inputDS = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+
+    if (!logicWindow.groups.get(0).keys.isEmpty && 
tableConfig.getMinIdleStateRetentionTime < 0) {
+      LOG.warn(
+        "No state retention interval configured for a query which accumulates 
state. " +
+          "Please provide a query configuration with valid retention interval 
to prevent " +
+          "excessive state size. You may specify a retention time of 0 to not 
clean up the state.")
+    }
+
+    val timeType = 
outputRowType.getFieldList.get(orderKey.getFieldIndex).getType
+
+    // check time field
+    if (!FlinkTypeFactory.isRowtimeIndicatorType(timeType)
+      && !FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      throw new TableException(
+        "OVER windows' ordering in stream mode must be defined on a time 
attribute.")
+    }
+
+    // identify window rowtime attribute
+    val rowTimeIdx: Option[Int] = if 
(FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
+      Some(orderKey.getFieldIndex)
+    } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      None
+    } else {
+      throw new TableException(
+        "OVER windows can only be applied on time attributes.")
+    }
+
+    if (overWindow.lowerBound.isPreceding
+      && overWindow.lowerBound.isUnbounded) {
+      throw new TableException(
+        "Python UDAF are not supported to be used in UNBOUNDED PRECEDING OVER 
windows."
+      )
+    } else if (!overWindow.upperBound.isCurrentRow) {
+      throw new TableException(
+        "Python UDAF are not supported to be used in UNBOUNDED FOLLOWING OVER 
windows."

Review comment:
       ditto

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonBoundedRowsOperator.java
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.functions.CleanupState;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} 
Operator for ROWS clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class AbstractStreamArrowPythonBoundedRowsOperator<K>
+       extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> 
implements CleanupState {
+
+       private static final long serialVersionUID = 1L;
+
+       private final long minRetentionTime;
+
+       private final long maxRetentionTime;
+
+       private final boolean stateCleaningEnabled;
+
+       /**
+        * list to sort timestamps to access rows in timestamp order.
+        */
+       transient LinkedList<Long> sortedTimestamps;
+
+       transient LinkedList<RowData> windowData;
+
+       public AbstractStreamArrowPythonBoundedRowsOperator(
+               Configuration config,
+               long minRetentionTime,
+               long maxRetentionTime,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, pandasAggFunctions, inputType, outputType, 
inputTimeFieldIndex, lowerBoundary,
+                       groupingSet, udafInputOffsets);
+               this.minRetentionTime = minRetentionTime;
+               this.maxRetentionTime = maxRetentionTime;
+               this.stateCleaningEnabled = minRetentionTime > 1;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               sortedTimestamps = new LinkedList<>();
+               windowData = new LinkedList<>();
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               if (stateCleaningEnabled) {
+
+                       Iterator<Long> keysIt = inputState.keys().iterator();
+                       Long lastProcessedTime = lastTriggeringTsState.value();
+                       if (lastProcessedTime == null) {
+                               lastProcessedTime = 0L;
+                       }
+
+                       // is data left which has not been processed yet?
+                       boolean noRecordsToProcess = true;
+                       while (keysIt.hasNext() && noRecordsToProcess) {
+                               if (keysIt.next() > lastProcessedTime) {
+                                       noRecordsToProcess = false;
+                               }
+                       }
+
+                       if (noRecordsToProcess) {
+                               inputState.clear();
+                               cleanupTsState.clear();
+                       } else {
+                               // There are records left to process because a 
watermark has not been received yet.
+                               // This would only happen if the input stream 
has stopped. So we don't need to clean up.
+                               // We leave the state as it is and schedule a 
new cleanup timer
+                               
registerProcessingCleanupTimer(timerService.currentProcessingTime());
+                       }
+               }
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {

Review comment:
       What about moving this method to 
StreamArrowPythonRowTimeBoundedRowsOperator as it's only used for event time.

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonBoundedRowsOperator.java
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.functions.CleanupState;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} 
Operator for ROWS clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class AbstractStreamArrowPythonBoundedRowsOperator<K>
+       extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> 
implements CleanupState {
+
+       private static final long serialVersionUID = 1L;
+
+       private final long minRetentionTime;
+
+       private final long maxRetentionTime;
+
+       private final boolean stateCleaningEnabled;
+
+       /**
+        * list to sort timestamps to access rows in timestamp order.
+        */
+       transient LinkedList<Long> sortedTimestamps;
+
+       transient LinkedList<RowData> windowData;
+
+       public AbstractStreamArrowPythonBoundedRowsOperator(
+               Configuration config,
+               long minRetentionTime,
+               long maxRetentionTime,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, pandasAggFunctions, inputType, outputType, 
inputTimeFieldIndex, lowerBoundary,
+                       groupingSet, udafInputOffsets);
+               this.minRetentionTime = minRetentionTime;
+               this.maxRetentionTime = maxRetentionTime;
+               this.stateCleaningEnabled = minRetentionTime > 1;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               sortedTimestamps = new LinkedList<>();
+               windowData = new LinkedList<>();
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               if (stateCleaningEnabled) {
+
+                       Iterator<Long> keysIt = inputState.keys().iterator();
+                       Long lastProcessedTime = lastTriggeringTsState.value();
+                       if (lastProcessedTime == null) {
+                               lastProcessedTime = 0L;
+                       }
+
+                       // is data left which has not been processed yet?
+                       boolean noRecordsToProcess = true;
+                       while (keysIt.hasNext() && noRecordsToProcess) {
+                               if (keysIt.next() > lastProcessedTime) {
+                                       noRecordsToProcess = false;
+                               }
+                       }
+
+                       if (noRecordsToProcess) {
+                               inputState.clear();
+                               cleanupTsState.clear();
+                       } else {
+                               // There are records left to process because a 
watermark has not been received yet.
+                               // This would only happen if the input stream 
has stopped. So we don't need to clean up.
+                               // We leave the state as it is and schedule a 
new cleanup timer
+                               
registerProcessingCleanupTimer(timerService.currentProcessingTime());
+                       }
+               }
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               long timestamp = timer.getTimestamp();
+               // gets all window data from state for the calculation
+               List<RowData> inputs = inputState.get(timestamp);
+
+               Iterable<Long> keyIter = inputState.keys();
+               for (Long dataTs : keyIter) {
+                       insertToSortedList(dataTs);
+               }
+               int index = sortedTimestamps.indexOf(timestamp);
+               for (int i = 0; i < inputs.size(); i++) {
+                       forwardedInputQueue.add(inputs.get(i));
+                       triggerWindowProcess(inputs, i, index);
+               }
+               windowData.clear();
+               sortedTimestamps.clear();
+       }
+
+       @Override
+       @SuppressWarnings("ConstantConditions")
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] udafResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(udafResult, 0, length);
+               int rowCount = arrowSerializer.load();
+               for (int i = 0; i < rowCount; i++) {
+                       RowData data = arrowSerializer.read(i);
+                       RowData key = forwardedInputQueue.poll();
+                       reuseJoinedRow.setRowKind(key.getRowKind());
+                       rowDataWrapper.collect(reuseJoinedRow.replace(key, 
data));
+               }
+       }
+
+       void registerProcessingCleanupTimer(long currentTime) throws Exception {
+               if (stateCleaningEnabled) {
+                       registerProcessingCleanupTimer(
+                               cleanupTsState,
+                               currentTime,
+                               minRetentionTime,
+                               maxRetentionTime,
+                               timerService);
+               }
+       }
+
+       void triggerWindowProcess(List<RowData> inputs, int i, int index) 
throws Exception {
+               int startIndex;
+               int startPos = 0;
+               if (windowData.isEmpty()) {
+                       if (i >= lowerBoundary) {
+                               for (int j = (int) (i - lowerBoundary); j <= i; 
j++) {
+                                       RowData rowData = inputs.get(j);
+                                       windowData.add(rowData);
+                                       
arrowSerializer.write(getFunctionInput(rowData));
+                               }
+                               currentBatchCount += lowerBoundary;
+                       } else {
+                               Long previousTimestamp;
+                               List<RowData> previousData = null;
+                               int length = 0;
+                               startIndex = index - 1;
+                               long remainingDataCount = lowerBoundary - i;
+                               ListIterator<Long> iter = 
sortedTimestamps.listIterator(index);
+                               while (remainingDataCount > 0 && 
iter.hasPrevious()) {
+                                       previousTimestamp = iter.previous();
+                                       previousData = 
inputState.get(previousTimestamp);
+                                       length = previousData.size();
+                                       if (remainingDataCount <= length) {
+                                               startPos = (int) (length - 
remainingDataCount);
+                                               remainingDataCount = 0;
+                                       } else {
+                                               remainingDataCount -= length;
+                                               startIndex--;
+                                       }
+                               }
+                               if (previousData != null) {
+                                       for (int j = startPos; j < length; j++) 
{
+                                               RowData rowData = 
previousData.get(j);
+                                               windowData.add(rowData);
+                                               
arrowSerializer.write(getFunctionInput(rowData));
+                                               currentBatchCount++;
+                                       }
+                                       // clear outdated data.
+                                       ListIterator<Long> clearIter = 
sortedTimestamps.listIterator();
+                                       for (int j = 0; j < startIndex; j++) {
+                                               long outdatedTs = 
clearIter.next();
+                                               inputState.remove(outdatedTs);
+                                       }
+
+                                       startIndex++;
+                                       while (startIndex < index) {
+                                               previousTimestamp = iter.next();
+                                               previousData = 
inputState.get(previousTimestamp);
+                                               for (RowData rowData : 
previousData) {
+                                                       windowData.add(rowData);
+                                                       
arrowSerializer.write(getFunctionInput(rowData));
+                                               }
+                                               currentBatchCount += 
previousData.size();
+                                               startIndex++;
+                                       }
+                               }
+                               for (int j = 0; j <= i; j++) {
+                                       RowData rowData = inputs.get(j);
+                                       windowData.add(rowData);
+                                       
arrowSerializer.write(getFunctionInput(rowData));
+                               }
+                               currentBatchCount += i + 1;
+                       }
+               } else {
+                       if (windowData.size() <= lowerBoundary) {

Review comment:
       Could improve as following:
   ```
   if (windowData.size() > lowerBoundary) {
      windowData.pop();
   }
   windowData.add(inputs.get(i));
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonBoundedRowsOperator.java
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.functions.CleanupState;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} 
Operator for ROWS clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class AbstractStreamArrowPythonBoundedRowsOperator<K>
+       extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> 
implements CleanupState {
+
+       private static final long serialVersionUID = 1L;
+
+       private final long minRetentionTime;
+
+       private final long maxRetentionTime;
+
+       private final boolean stateCleaningEnabled;
+
+       /**
+        * list to sort timestamps to access rows in timestamp order.
+        */
+       transient LinkedList<Long> sortedTimestamps;
+
+       transient LinkedList<RowData> windowData;
+
+       public AbstractStreamArrowPythonBoundedRowsOperator(
+               Configuration config,
+               long minRetentionTime,
+               long maxRetentionTime,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, pandasAggFunctions, inputType, outputType, 
inputTimeFieldIndex, lowerBoundary,
+                       groupingSet, udafInputOffsets);
+               this.minRetentionTime = minRetentionTime;
+               this.maxRetentionTime = maxRetentionTime;
+               this.stateCleaningEnabled = minRetentionTime > 1;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               sortedTimestamps = new LinkedList<>();
+               windowData = new LinkedList<>();
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               if (stateCleaningEnabled) {
+
+                       Iterator<Long> keysIt = inputState.keys().iterator();
+                       Long lastProcessedTime = lastTriggeringTsState.value();
+                       if (lastProcessedTime == null) {
+                               lastProcessedTime = 0L;
+                       }
+
+                       // is data left which has not been processed yet?
+                       boolean noRecordsToProcess = true;
+                       while (keysIt.hasNext() && noRecordsToProcess) {
+                               if (keysIt.next() > lastProcessedTime) {
+                                       noRecordsToProcess = false;
+                               }
+                       }
+
+                       if (noRecordsToProcess) {
+                               inputState.clear();
+                               cleanupTsState.clear();
+                       } else {
+                               // There are records left to process because a 
watermark has not been received yet.
+                               // This would only happen if the input stream 
has stopped. So we don't need to clean up.
+                               // We leave the state as it is and schedule a 
new cleanup timer
+                               
registerProcessingCleanupTimer(timerService.currentProcessingTime());
+                       }
+               }
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               long timestamp = timer.getTimestamp();
+               // gets all window data from state for the calculation
+               List<RowData> inputs = inputState.get(timestamp);
+
+               Iterable<Long> keyIter = inputState.keys();
+               for (Long dataTs : keyIter) {
+                       insertToSortedList(dataTs);
+               }
+               int index = sortedTimestamps.indexOf(timestamp);
+               for (int i = 0; i < inputs.size(); i++) {
+                       forwardedInputQueue.add(inputs.get(i));
+                       triggerWindowProcess(inputs, i, index);
+               }
+               windowData.clear();
+               sortedTimestamps.clear();
+       }
+
+       @Override
+       @SuppressWarnings("ConstantConditions")
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] udafResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(udafResult, 0, length);
+               int rowCount = arrowSerializer.load();
+               for (int i = 0; i < rowCount; i++) {
+                       RowData data = arrowSerializer.read(i);
+                       RowData key = forwardedInputQueue.poll();
+                       reuseJoinedRow.setRowKind(key.getRowKind());
+                       rowDataWrapper.collect(reuseJoinedRow.replace(key, 
data));
+               }
+       }
+
+       void registerProcessingCleanupTimer(long currentTime) throws Exception {
+               if (stateCleaningEnabled) {
+                       registerProcessingCleanupTimer(
+                               cleanupTsState,
+                               currentTime,
+                               minRetentionTime,
+                               maxRetentionTime,
+                               timerService);
+               }
+       }
+
+       void triggerWindowProcess(List<RowData> inputs, int i, int index) 
throws Exception {
+               int startIndex;
+               int startPos = 0;
+               if (windowData.isEmpty()) {
+                       if (i >= lowerBoundary) {
+                               for (int j = (int) (i - lowerBoundary); j <= i; 
j++) {
+                                       RowData rowData = inputs.get(j);
+                                       windowData.add(rowData);
+                                       
arrowSerializer.write(getFunctionInput(rowData));
+                               }
+                               currentBatchCount += lowerBoundary;
+                       } else {
+                               Long previousTimestamp;
+                               List<RowData> previousData = null;
+                               int length = 0;
+                               startIndex = index - 1;
+                               long remainingDataCount = lowerBoundary - i;
+                               ListIterator<Long> iter = 
sortedTimestamps.listIterator(index);
+                               while (remainingDataCount > 0 && 
iter.hasPrevious()) {
+                                       previousTimestamp = iter.previous();
+                                       previousData = 
inputState.get(previousTimestamp);
+                                       length = previousData.size();
+                                       if (remainingDataCount <= length) {
+                                               startPos = (int) (length - 
remainingDataCount);
+                                               remainingDataCount = 0;
+                                       } else {
+                                               remainingDataCount -= length;
+                                               startIndex--;
+                                       }
+                               }
+                               if (previousData != null) {
+                                       for (int j = startPos; j < length; j++) 
{
+                                               RowData rowData = 
previousData.get(j);
+                                               windowData.add(rowData);
+                                               
arrowSerializer.write(getFunctionInput(rowData));
+                                               currentBatchCount++;
+                                       }
+                                       // clear outdated data.
+                                       ListIterator<Long> clearIter = 
sortedTimestamps.listIterator();
+                                       for (int j = 0; j < startIndex; j++) {
+                                               long outdatedTs = 
clearIter.next();
+                                               inputState.remove(outdatedTs);
+                                       }
+
+                                       startIndex++;
+                                       while (startIndex < index) {
+                                               previousTimestamp = iter.next();
+                                               previousData = 
inputState.get(previousTimestamp);
+                                               for (RowData rowData : 
previousData) {
+                                                       windowData.add(rowData);
+                                                       
arrowSerializer.write(getFunctionInput(rowData));
+                                               }
+                                               currentBatchCount += 
previousData.size();
+                                               startIndex++;
+                                       }
+                               }
+                               for (int j = 0; j <= i; j++) {
+                                       RowData rowData = inputs.get(j);
+                                       windowData.add(rowData);
+                                       
arrowSerializer.write(getFunctionInput(rowData));
+                               }
+                               currentBatchCount += i + 1;
+                       }
+               } else {
+                       if (windowData.size() <= lowerBoundary) {
+                               windowData.add(inputs.get(i));
+                       } else {
+                               windowData.pop();
+                               windowData.add(inputs.get(i));
+                       }
+                       for (RowData rowData : windowData) {
+                               
arrowSerializer.write(getFunctionInput(rowData));
+                       }
+                       currentBatchCount += windowData.size();
+               }
+               if (currentBatchCount > 0) {

Review comment:
       Extract these code into a separate class and share them between rows and 
ranges over window?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonBoundedRangeOperator.java
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} 
Operator for RANGE clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class AbstractStreamArrowPythonBoundedRangeOperator<K>
+       extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> 
{
+
+       private static final long serialVersionUID = 1L;
+
+       private transient LinkedList<List<RowData>> inputData;
+
+       public AbstractStreamArrowPythonBoundedRangeOperator(
+               Configuration config,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, pandasAggFunctions, inputType, outputType, 
inputTimeFieldIndex, lowerBoundary,
+                       groupingSet, udafInputOffsets);
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               inputData = new LinkedList<>();
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               long timestamp = timer.getTimestamp();
+               Long cleanupTimestamp = cleanupTsState.value();
+               // if cleanupTsState has not been updated then it is safe to 
cleanup states
+               if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+                       inputState.clear();
+                       lastTriggeringTsState.clear();
+                       cleanupTsState.clear();
+                       return;
+               }
+               // gets all window data from state for the calculation
+               List<RowData> inputs = inputState.get(timestamp);
+               triggerWindowProcess(timestamp, inputs);
+               lastTriggeringTsState.update(timestamp);
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               long timestamp = timer.getTimestamp();
+               Long cleanupTimestamp = cleanupTsState.value();
+               // if cleanupTsState has not been updated then it is safe to 
cleanup states
+               if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+                       inputState.clear();
+                       cleanupTsState.clear();
+                       return;
+               }
+
+               // we consider the original timestamp of events
+               // that have registered this time trigger 1 ms ago
+
+               long currentTime = timestamp - 1;
+
+               // get the list of elements of current proctime
+               List<RowData> currentElements = inputState.get(currentTime);
+               triggerWindowProcess(timestamp, currentElements);
+       }
+
+       @Override
+       @SuppressWarnings("ConstantConditions")
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] udafResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(udafResult, 0, length);
+               int rowCount = arrowSerializer.load();
+               for (int i = 0; i < rowCount; i++) {
+                       RowData data = arrowSerializer.read(i);
+                       List<RowData> input = inputData.poll();
+                       for (RowData ele : input) {
+                               reuseJoinedRow.setRowKind(ele.getRowKind());
+                               
rowDataWrapper.collect(reuseJoinedRow.replace(ele, data));
+                       }
+               }
+       }
+
+       void registerCleanupTimer(long timestamp, TimeDomain domain) throws 
Exception {
+               long minCleanupTimestamp = timestamp + lowerBoundary + 1;
+               long maxCleanupTimestamp = timestamp + (long) (lowerBoundary * 
1.5) + 1;
+               // update timestamp and register timer if needed
+               Long curCleanupTimestamp = cleanupTsState.value();
+               if (curCleanupTimestamp == null || curCleanupTimestamp < 
minCleanupTimestamp) {
+                       // we don't delete existing timer since it may delete 
timer for data processing
+                       if (domain == TimeDomain.EVENT_TIME) {
+                               
timerService.registerEventTimeTimer(maxCleanupTimestamp);
+                       } else {
+                               
timerService.registerProcessingTimeTimer(maxCleanupTimestamp);
+                       }
+                       cleanupTsState.update(maxCleanupTimestamp);
+               }
+       }
+
+       private void triggerWindowProcess(long upperLimit, List<RowData> 
inputs) throws Exception {
+               long lowerLimit = upperLimit - lowerBoundary;
+               List<Long> outdatedTs = new LinkedList<>();
+               if (inputs != null) {
+
+                       for (Map.Entry<Long, List<RowData>> entry : 
inputState.entries()) {
+                               long dataTs = entry.getKey();
+                               if (dataTs >= lowerLimit) {
+                                       if (dataTs <= upperLimit) {
+                                               List<RowData> dataList = 
entry.getValue();
+                                               if (dataList != null) {
+                                                       for (RowData data : 
dataList) {
+                                                               
arrowSerializer.write(getFunctionInput(data));
+                                                               
currentBatchCount++;
+                                                       }
+                                               }
+                                       }
+                               } else {
+                                       outdatedTs.add(dataTs);

Review comment:
       could clear the state directly, there is no need to store them into a 
list.

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonBoundedRowsOperator.java
##########
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.functions.CleanupState;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} 
Operator for ROWS clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class AbstractStreamArrowPythonBoundedRowsOperator<K>
+       extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> 
implements CleanupState {
+
+       private static final long serialVersionUID = 1L;
+
+       private final long minRetentionTime;
+
+       private final long maxRetentionTime;
+
+       private final boolean stateCleaningEnabled;
+
+       /**
+        * list to sort timestamps to access rows in timestamp order.
+        */
+       transient LinkedList<Long> sortedTimestamps;
+
+       transient LinkedList<RowData> windowData;
+
+       public AbstractStreamArrowPythonBoundedRowsOperator(
+               Configuration config,
+               long minRetentionTime,
+               long maxRetentionTime,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, pandasAggFunctions, inputType, outputType, 
inputTimeFieldIndex, lowerBoundary,
+                       groupingSet, udafInputOffsets);
+               this.minRetentionTime = minRetentionTime;
+               this.maxRetentionTime = maxRetentionTime;
+               this.stateCleaningEnabled = minRetentionTime > 1;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               sortedTimestamps = new LinkedList<>();
+               windowData = new LinkedList<>();
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               if (stateCleaningEnabled) {
+
+                       Iterator<Long> keysIt = inputState.keys().iterator();
+                       Long lastProcessedTime = lastTriggeringTsState.value();
+                       if (lastProcessedTime == null) {
+                               lastProcessedTime = 0L;
+                       }
+
+                       // is data left which has not been processed yet?
+                       boolean noRecordsToProcess = true;
+                       while (keysIt.hasNext() && noRecordsToProcess) {
+                               if (keysIt.next() > lastProcessedTime) {
+                                       noRecordsToProcess = false;
+                               }
+                       }
+
+                       if (noRecordsToProcess) {
+                               inputState.clear();
+                               cleanupTsState.clear();
+                       } else {
+                               // There are records left to process because a 
watermark has not been received yet.
+                               // This would only happen if the input stream 
has stopped. So we don't need to clean up.
+                               // We leave the state as it is and schedule a 
new cleanup timer
+                               
registerProcessingCleanupTimer(timerService.currentProcessingTime());
+                       }
+               }
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               long timestamp = timer.getTimestamp();
+               // gets all window data from state for the calculation
+               List<RowData> inputs = inputState.get(timestamp);
+
+               Iterable<Long> keyIter = inputState.keys();
+               for (Long dataTs : keyIter) {
+                       insertToSortedList(dataTs);
+               }
+               int index = sortedTimestamps.indexOf(timestamp);
+               for (int i = 0; i < inputs.size(); i++) {
+                       forwardedInputQueue.add(inputs.get(i));
+                       triggerWindowProcess(inputs, i, index);
+               }
+               windowData.clear();
+               sortedTimestamps.clear();
+       }
+
+       @Override
+       @SuppressWarnings("ConstantConditions")
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] udafResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(udafResult, 0, length);
+               int rowCount = arrowSerializer.load();
+               for (int i = 0; i < rowCount; i++) {
+                       RowData data = arrowSerializer.read(i);
+                       RowData key = forwardedInputQueue.poll();
+                       reuseJoinedRow.setRowKind(key.getRowKind());
+                       rowDataWrapper.collect(reuseJoinedRow.replace(key, 
data));
+               }
+       }
+
+       void registerProcessingCleanupTimer(long currentTime) throws Exception {
+               if (stateCleaningEnabled) {
+                       registerProcessingCleanupTimer(
+                               cleanupTsState,
+                               currentTime,
+                               minRetentionTime,
+                               maxRetentionTime,
+                               timerService);
+               }
+       }
+
+       void triggerWindowProcess(List<RowData> inputs, int i, int index) 
throws Exception {
+               int startIndex;
+               int startPos = 0;
+               if (windowData.isEmpty()) {
+                       if (i >= lowerBoundary) {
+                               for (int j = (int) (i - lowerBoundary); j <= i; 
j++) {
+                                       RowData rowData = inputs.get(j);
+                                       windowData.add(rowData);
+                                       
arrowSerializer.write(getFunctionInput(rowData));
+                               }
+                               currentBatchCount += lowerBoundary;
+                       } else {
+                               Long previousTimestamp;
+                               List<RowData> previousData = null;
+                               int length = 0;
+                               startIndex = index - 1;
+                               long remainingDataCount = lowerBoundary - i;
+                               ListIterator<Long> iter = 
sortedTimestamps.listIterator(index);
+                               while (remainingDataCount > 0 && 
iter.hasPrevious()) {
+                                       previousTimestamp = iter.previous();
+                                       previousData = 
inputState.get(previousTimestamp);

Review comment:
       Since the data is already read from the state, why not just storing them 
in windowData?

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonOverAggregate.scala
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.memory.ManagedMemoryUseCase
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonAggregate
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonOverAggregate.{
+  ARROW_PYTHON_OVER_WINDOW_RANGE_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_RANGE_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_ROWS_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_ROWS_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME
+}
+import org.apache.flink.table.planner.plan.utils.{KeySelectorUtil, 
OverAggregateUtil}
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Window.Group
+import org.apache.calcite.rel.core.{AggregateCall, Window}
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+/**
+  * Stream physical RelNode for python time-based over [[Window]].
+  */
+class StreamExecPythonOverAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    outputRowType: RelDataType,
+    inputRowType: RelDataType,
+    logicWindow: Window)
+  extends StreamExecOverAggregateBase(
+    cluster,
+    traitSet,
+    inputRel,
+    outputRowType,
+    inputRowType,
+    logicWindow)
+  with CommonPythonAggregate {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new StreamExecPythonOverAggregate(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      outputRowType,
+      inputRowType,
+      logicWindow
+      )
+  }
+
+  override protected def translateToPlanInternal(
+      planner: StreamPlanner): Transformation[RowData] = {
+    val tableConfig = planner.getTableConfig
+
+    val overWindow: Group = logicWindow.groups.get(0)
+
+    val orderKeys = overWindow.orderKeys.getFieldCollations
+
+    if (orderKeys.size() != 1) {
+      throw new TableException(
+        "The window can only be ordered by a single time column.")
+    }
+    val orderKey = orderKeys.get(0)
+
+    if (!orderKey.direction.equals(ASCENDING)) {
+      throw new TableException(
+        "The window can only be ordered in ASCENDING mode.")
+    }
+
+    val inputDS = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+
+    if (!logicWindow.groups.get(0).keys.isEmpty && 
tableConfig.getMinIdleStateRetentionTime < 0) {
+      LOG.warn(
+        "No state retention interval configured for a query which accumulates 
state. " +
+          "Please provide a query configuration with valid retention interval 
to prevent " +
+          "excessive state size. You may specify a retention time of 0 to not 
clean up the state.")
+    }
+
+    val timeType = 
outputRowType.getFieldList.get(orderKey.getFieldIndex).getType
+
+    // check time field
+    if (!FlinkTypeFactory.isRowtimeIndicatorType(timeType)
+      && !FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      throw new TableException(
+        "OVER windows' ordering in stream mode must be defined on a time 
attribute.")
+    }
+
+    // identify window rowtime attribute
+    val rowTimeIdx: Option[Int] = if 
(FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
+      Some(orderKey.getFieldIndex)
+    } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      None
+    } else {
+      throw new TableException(
+        "OVER windows can only be applied on time attributes.")
+    }
+
+    if (overWindow.lowerBound.isPreceding
+      && overWindow.lowerBound.isUnbounded) {
+      throw new TableException(
+        "Python UDAF are not supported to be used in UNBOUNDED PRECEDING OVER 
windows."

Review comment:
       ```suggestion
           "Python UDAF is not supported to be used in UNBOUNDED PRECEDING OVER 
windows."
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to