dianfu commented on a change in pull request #13504:
URL: https://github.com/apache/flink/pull/13504#discussion_r503085577



##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeRowsBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Stream Arrow Python {@link AggregateFunction} Operator for RANGE clause 
event-time bounded
+ * OVER window.
+ */
+@Internal
+public class 
StreamArrowPythonRowTimeRowsBoundedOverWindowAggregateFunctionOperator<K>
+       extends 
AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator<K> {
+
+       private static final long serialVersionUID = 1L;
+
+       public 
StreamArrowPythonRowTimeRowsBoundedOverWindowAggregateFunctionOperator(
+               Configuration config,
+               long minRetentionTime,
+               long maxRetentionTime,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, minRetentionTime, maxRetentionTime, 
pandasAggFunctions,
+                       inputType, outputType, inputTimeFieldIndex, 
lowerBoundary, groupingSet, udafInputOffsets);
+       }
+
+       @Override
+       public void bufferInput(RowData input) throws Exception {
+               // register state-cleanup timer
+               
registerProcessingCleanupTimer(timerService.currentProcessingTime());
+
+               // triggering timestamp for trigger calculation
+               long triggeringTs = input.getLong(inputTimeFieldIndex);
+
+               Long lastTriggeringTs = lastTriggeringTsState.value();
+               if (lastTriggeringTs == null) {
+                       lastTriggeringTs = 0L;
+               }
+
+               // check if the data is expired, if not, save the data and 
register event time timer
+               if (triggeringTs > lastTriggeringTs) {
+                       List<RowData> data = inputState.get(triggeringTs);
+                       if (null != data) {
+                               data.add(input);
+                               inputState.put(triggeringTs, data);

Review comment:
       ```suggestion
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeRangeBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Stream Arrow Python {@link AggregateFunction} Operator for RANGE clause 
event-time bounded
+ * OVER window.
+ */
+@Internal
+public class 
StreamArrowPythonRowTimeRangeBoundedOverWindowAggregateFunctionOperator<K>

Review comment:
       ```suggestion
   public class StreamArrowPythonRowTimeBoundedRangeOperator<K>
   ```
   What about simplify the class name a bit? Same for the other classes.

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.functions.CleanupState;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} 
Operator for ROWS clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class 
AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator<K>
+       extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> 
implements CleanupState {
+
+       private static final long serialVersionUID = 1L;
+
+       private final long minRetentionTime;
+
+       private final long maxRetentionTime;
+
+       private final boolean stateCleaningEnabled;
+
+       /**
+        * list to sort timestamps to access rows in timestamp order.
+        */
+       transient LinkedList<Long> sortedTimestamps;
+
+       public 
AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator(
+               Configuration config,
+               long minRetentionTime,
+               long maxRetentionTime,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, pandasAggFunctions, inputType, outputType, 
inputTimeFieldIndex, lowerBoundary,
+                       groupingSet, udafInputOffsets);
+               this.minRetentionTime = minRetentionTime;
+               this.maxRetentionTime = maxRetentionTime;
+               this.stateCleaningEnabled = minRetentionTime > 1;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               sortedTimestamps = new LinkedList<>();
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               if (stateCleaningEnabled) {
+
+                       Iterator<Long> keysIt = inputState.keys().iterator();
+                       Long lastProcessedTime = lastTriggeringTsState.value();
+                       if (lastProcessedTime == null) {
+                               lastProcessedTime = 0L;
+                       }
+
+                       // is data left which has not been processed yet?
+                       boolean noRecordsToProcess = true;
+                       while (keysIt.hasNext() && noRecordsToProcess) {
+                               if (keysIt.next() > lastProcessedTime) {
+                                       noRecordsToProcess = false;
+                               }
+                       }
+
+                       if (noRecordsToProcess) {
+                               inputState.clear();
+                               cleanupTsState.clear();
+                       } else {
+                               // There are records left to process because a 
watermark has not been received yet.
+                               // This would only happen if the input stream 
has stopped. So we don't need to clean up.
+                               // We leave the state as it is and schedule a 
new cleanup timer
+                               
registerProcessingCleanupTimer(timerService.currentProcessingTime());
+                       }
+               }
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               long timestamp = timer.getTimestamp();
+               // gets all window data from state for the calculation
+               List<RowData> inputs = inputState.get(timestamp);
+
+               Iterable<Long> keyIter = inputState.keys();
+               for (Long dataTs : keyIter) {
+                       insertToSortedList(dataTs);
+               }
+               int index = sortedTimestamps.indexOf(timestamp);
+               for (int i = 0; i < inputs.size(); i++) {
+                       forwardedInputQueue.add(inputs.get(i));
+                       triggerWindowProcess(inputs, i, index);
+               }
+               sortedTimestamps.clear();
+       }
+
+       @Override
+       @SuppressWarnings("ConstantConditions")
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] udafResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(udafResult, 0, length);
+               int rowCount = arrowSerializer.load();
+               for (int i = 0; i < rowCount; i++) {
+                       RowData data = arrowSerializer.read(i);
+                       RowData key = forwardedInputQueue.poll();
+                       reuseJoinedRow.setRowKind(key.getRowKind());
+                       rowDataWrapper.collect(reuseJoinedRow.replace(key, 
data));
+               }
+       }
+
+       void registerProcessingCleanupTimer(long currentTime) throws Exception {
+               if (stateCleaningEnabled) {
+                       registerProcessingCleanupTimer(
+                               cleanupTsState,
+                               currentTime,
+                               minRetentionTime,
+                               maxRetentionTime,
+                               timerService);
+               }
+       }
+
+       void triggerWindowProcess(List<RowData> inputs, int i, int index) 
throws Exception {
+               int startIndex;
+               int startPos = 0;
+               if (i >= lowerBoundary) {
+                       for (int j = (int) (i - lowerBoundary); j <= i; j++) {
+                               
arrowSerializer.write(getFunctionInput(inputs.get(j)));
+                       }
+                       currentBatchCount += lowerBoundary;
+               } else {
+                       Long previousTimestamp;
+                       List<RowData> previousData = null;
+                       int length = 0;
+                       startIndex = index - 1;
+                       long remainingDataCount = lowerBoundary - i;
+                       ListIterator<Long> iter = 
sortedTimestamps.listIterator(index);
+                       while (remainingDataCount > 0 && iter.hasPrevious()) {
+                               previousTimestamp = iter.previous();
+                               previousData = 
inputState.get(previousTimestamp);
+                               length = previousData.size();
+                               if (remainingDataCount <= length) {
+                                       startPos = (int) (length - 
remainingDataCount);
+                                       remainingDataCount = 0;
+                               } else {
+                                       remainingDataCount -= length;
+                                       startIndex--;
+                               }
+                       }
+                       if (previousData != null) {
+                               for (int j = startPos; j < length; j++) {
+                                       
arrowSerializer.write(getFunctionInput(previousData.get(j)));
+                                       currentBatchCount++;
+                                       startIndex++;

Review comment:
       I guess ``startIndex+`` should be moved out of the for loop

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonProcTimeRowsBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Stream Arrow Python {@link AggregateFunction} Operator for ROWS clause 
proc-time bounded
+ * OVER window.
+ */
+@Internal
+public class 
StreamArrowPythonProcTimeRowsBoundedOverWindowAggregateFunctionOperator<K>
+       extends 
AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator<K> {
+
+       private static final long serialVersionUID = 1L;
+
+       private transient long currentTime;
+
+       public 
StreamArrowPythonProcTimeRowsBoundedOverWindowAggregateFunctionOperator(
+               Configuration config,
+               long minRetentionTime,
+               long maxRetentionTime,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, minRetentionTime, maxRetentionTime, 
pandasAggFunctions,
+                       inputType, outputType, inputTimeFieldIndex, 
lowerBoundary, groupingSet, udafInputOffsets);
+       }
+
+       @Override
+       public void bufferInput(RowData input) throws Exception {
+               currentTime = timerService.currentProcessingTime();
+               // register state-cleanup timer
+               registerProcessingCleanupTimer(currentTime);
+
+               // buffer the event incoming event
+
+               // add current element to the window list of elements with 
corresponding timestamp
+               List<RowData> rowList = inputState.get(currentTime);
+               // null value means that this is the first event received for 
this timestamp
+               if (rowList == null) {
+                       rowList = new ArrayList<>();
+               }
+               rowList.add(input);

Review comment:
       Make rowList as an instance variable, then there is no need to read 
inputState again in *processElementInternal*

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeRowsBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The Stream Arrow Python {@link AggregateFunction} Operator for RANGE clause 
event-time bounded
+ * OVER window.
+ */
+@Internal
+public class 
StreamArrowPythonRowTimeRowsBoundedOverWindowAggregateFunctionOperator<K>
+       extends 
AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator<K> {
+
+       private static final long serialVersionUID = 1L;
+
+       public 
StreamArrowPythonRowTimeRowsBoundedOverWindowAggregateFunctionOperator(
+               Configuration config,
+               long minRetentionTime,
+               long maxRetentionTime,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, minRetentionTime, maxRetentionTime, 
pandasAggFunctions,
+                       inputType, outputType, inputTimeFieldIndex, 
lowerBoundary, groupingSet, udafInputOffsets);
+       }
+
+       @Override
+       public void bufferInput(RowData input) throws Exception {
+               // register state-cleanup timer
+               
registerProcessingCleanupTimer(timerService.currentProcessingTime());
+
+               // triggering timestamp for trigger calculation
+               long triggeringTs = input.getLong(inputTimeFieldIndex);
+
+               Long lastTriggeringTs = lastTriggeringTsState.value();
+               if (lastTriggeringTs == null) {
+                       lastTriggeringTs = 0L;
+               }
+
+               // check if the data is expired, if not, save the data and 
register event time timer
+               if (triggeringTs > lastTriggeringTs) {
+                       List<RowData> data = inputState.get(triggeringTs);
+                       if (null != data) {
+                               data.add(input);
+                               inputState.put(triggeringTs, data);
+                       } else {
+                               data = new ArrayList<>();
+                               data.add(input);
+                               inputState.put(triggeringTs, data);

Review comment:
       ```suggestion
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.functions.CleanupState;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} 
Operator for ROWS clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class 
AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator<K>
+       extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> 
implements CleanupState {
+
+       private static final long serialVersionUID = 1L;
+
+       private final long minRetentionTime;
+
+       private final long maxRetentionTime;
+
+       private final boolean stateCleaningEnabled;
+
+       /**
+        * list to sort timestamps to access rows in timestamp order.
+        */
+       transient LinkedList<Long> sortedTimestamps;
+
+       public 
AbstractStreamArrowPythonRowsBoundedOverWindowAggregateFunctionOperator(
+               Configuration config,
+               long minRetentionTime,
+               long maxRetentionTime,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, pandasAggFunctions, inputType, outputType, 
inputTimeFieldIndex, lowerBoundary,
+                       groupingSet, udafInputOffsets);
+               this.minRetentionTime = minRetentionTime;
+               this.maxRetentionTime = maxRetentionTime;
+               this.stateCleaningEnabled = minRetentionTime > 1;
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               sortedTimestamps = new LinkedList<>();
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               if (stateCleaningEnabled) {
+
+                       Iterator<Long> keysIt = inputState.keys().iterator();
+                       Long lastProcessedTime = lastTriggeringTsState.value();
+                       if (lastProcessedTime == null) {
+                               lastProcessedTime = 0L;
+                       }
+
+                       // is data left which has not been processed yet?
+                       boolean noRecordsToProcess = true;
+                       while (keysIt.hasNext() && noRecordsToProcess) {
+                               if (keysIt.next() > lastProcessedTime) {
+                                       noRecordsToProcess = false;
+                               }
+                       }
+
+                       if (noRecordsToProcess) {
+                               inputState.clear();
+                               cleanupTsState.clear();
+                       } else {
+                               // There are records left to process because a 
watermark has not been received yet.
+                               // This would only happen if the input stream 
has stopped. So we don't need to clean up.
+                               // We leave the state as it is and schedule a 
new cleanup timer
+                               
registerProcessingCleanupTimer(timerService.currentProcessingTime());
+                       }
+               }
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               long timestamp = timer.getTimestamp();
+               // gets all window data from state for the calculation
+               List<RowData> inputs = inputState.get(timestamp);
+
+               Iterable<Long> keyIter = inputState.keys();
+               for (Long dataTs : keyIter) {
+                       insertToSortedList(dataTs);
+               }
+               int index = sortedTimestamps.indexOf(timestamp);
+               for (int i = 0; i < inputs.size(); i++) {
+                       forwardedInputQueue.add(inputs.get(i));
+                       triggerWindowProcess(inputs, i, index);
+               }
+               sortedTimestamps.clear();
+       }
+
+       @Override
+       @SuppressWarnings("ConstantConditions")
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] udafResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(udafResult, 0, length);
+               int rowCount = arrowSerializer.load();
+               for (int i = 0; i < rowCount; i++) {
+                       RowData data = arrowSerializer.read(i);
+                       RowData key = forwardedInputQueue.poll();
+                       reuseJoinedRow.setRowKind(key.getRowKind());
+                       rowDataWrapper.collect(reuseJoinedRow.replace(key, 
data));
+               }
+       }
+
+       void registerProcessingCleanupTimer(long currentTime) throws Exception {
+               if (stateCleaningEnabled) {
+                       registerProcessingCleanupTimer(
+                               cleanupTsState,
+                               currentTime,
+                               minRetentionTime,
+                               maxRetentionTime,
+                               timerService);
+               }
+       }
+
+       void triggerWindowProcess(List<RowData> inputs, int i, int index) 
throws Exception {
+               int startIndex;
+               int startPos = 0;
+               if (i >= lowerBoundary) {
+                       for (int j = (int) (i - lowerBoundary); j <= i; j++) {
+                               
arrowSerializer.write(getFunctionInput(inputs.get(j)));
+                       }
+                       currentBatchCount += lowerBoundary;
+               } else {
+                       Long previousTimestamp;
+                       List<RowData> previousData = null;
+                       int length = 0;
+                       startIndex = index - 1;
+                       long remainingDataCount = lowerBoundary - i;
+                       ListIterator<Long> iter = 
sortedTimestamps.listIterator(index);
+                       while (remainingDataCount > 0 && iter.hasPrevious()) {
+                               previousTimestamp = iter.previous();
+                               previousData = 
inputState.get(previousTimestamp);
+                               length = previousData.size();
+                               if (remainingDataCount <= length) {
+                                       startPos = (int) (length - 
remainingDataCount);
+                                       remainingDataCount = 0;
+                               } else {
+                                       remainingDataCount -= length;
+                                       startIndex--;
+                               }
+                       }
+                       if (previousData != null) {
+                               for (int j = startPos; j < length; j++) {
+                                       
arrowSerializer.write(getFunctionInput(previousData.get(j)));
+                                       currentBatchCount++;
+                                       startIndex++;
+                               }
+                               while (startIndex < index) {
+                                       previousTimestamp = iter.next();
+                                       previousData = 
inputState.get(previousTimestamp);

Review comment:
       The data stored in the *inputState* will be read multiple times: 2 * 
inputs.size().
   
   What about buffering them in memory for performance optimization?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} 
Operator for RANGE clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class 
AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator<K>
+       extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> 
{
+
+       private static final long serialVersionUID = 1L;
+
+       private transient LinkedList<List<RowData>> inputData;
+
+       public 
AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator(
+               Configuration config,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, pandasAggFunctions, inputType, outputType, 
inputTimeFieldIndex, lowerBoundary,
+                       groupingSet, udafInputOffsets);
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               inputData = new LinkedList<>();
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               long timestamp = timer.getTimestamp();
+               Long cleanupTimestamp = cleanupTsState.value();
+               // if cleanupTsState has not been updated then it is safe to 
cleanup states
+               if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+                       inputState.clear();
+                       lastTriggeringTsState.clear();
+                       cleanupTsState.clear();
+                       return;
+               }
+               // gets all window data from state for the calculation
+               List<RowData> inputs = inputState.get(timestamp);
+               triggerWindowProcess(timestamp, inputs);
+               lastTriggeringTsState.update(timestamp);
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               long timestamp = timer.getTimestamp();
+               Long cleanupTimestamp = cleanupTsState.value();
+               // if cleanupTsState has not been updated then it is safe to 
cleanup states
+               if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+                       inputState.clear();
+                       cleanupTsState.clear();
+                       return;
+               }
+
+               // we consider the original timestamp of events
+               // that have registered this time trigger 1 ms ago
+
+               long currentTime = timestamp - 1;
+
+               // get the list of elements of current proctime
+               List<RowData> currentElements = inputState.get(currentTime);
+               triggerWindowProcess(timestamp, currentElements);
+       }
+
+       @Override
+       @SuppressWarnings("ConstantConditions")
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] udafResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(udafResult, 0, length);
+               int rowCount = arrowSerializer.load();
+               for (int i = 0; i < rowCount; i++) {
+                       RowData data = arrowSerializer.read(i);
+                       List<RowData> input = inputData.poll();
+                       for (RowData ele : input) {
+                               reuseJoinedRow.setRowKind(ele.getRowKind());
+                               
rowDataWrapper.collect(reuseJoinedRow.replace(ele, data));
+                       }
+               }
+       }
+
+       void registerCleanupTimer(long timestamp, TimeDomain domain) throws 
Exception {
+               long minCleanupTimestamp = timestamp + lowerBoundary + 1;
+               long maxCleanupTimestamp = timestamp + (long) (lowerBoundary * 
1.5) + 1;
+               // update timestamp and register timer if needed
+               Long curCleanupTimestamp = cleanupTsState.value();
+               if (curCleanupTimestamp == null || curCleanupTimestamp < 
minCleanupTimestamp) {
+                       // we don't delete existing timer since it may delete 
timer for data processing
+                       if (domain == TimeDomain.EVENT_TIME) {
+                               
timerService.registerEventTimeTimer(maxCleanupTimestamp);
+                       } else {
+                               
timerService.registerProcessingTimeTimer(maxCleanupTimestamp);
+                       }
+                       cleanupTsState.update(maxCleanupTimestamp);
+               }
+       }
+
+       private void triggerWindowProcess(long timestamp, List<RowData> inputs) 
throws Exception {
+               long limit = timestamp - lowerBoundary;

Review comment:
       ```suggestion
                long lowerLimit = timestamp - lowerBoundary;
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} 
Operator for RANGE clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class 
AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator<K>
+       extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> 
{
+
+       private static final long serialVersionUID = 1L;
+
+       private transient LinkedList<List<RowData>> inputData;
+
+       public 
AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator(
+               Configuration config,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, pandasAggFunctions, inputType, outputType, 
inputTimeFieldIndex, lowerBoundary,
+                       groupingSet, udafInputOffsets);
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               inputData = new LinkedList<>();
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               long timestamp = timer.getTimestamp();
+               Long cleanupTimestamp = cleanupTsState.value();
+               // if cleanupTsState has not been updated then it is safe to 
cleanup states
+               if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+                       inputState.clear();
+                       lastTriggeringTsState.clear();
+                       cleanupTsState.clear();
+                       return;
+               }
+               // gets all window data from state for the calculation
+               List<RowData> inputs = inputState.get(timestamp);
+               triggerWindowProcess(timestamp, inputs);
+               lastTriggeringTsState.update(timestamp);
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               long timestamp = timer.getTimestamp();
+               Long cleanupTimestamp = cleanupTsState.value();
+               // if cleanupTsState has not been updated then it is safe to 
cleanup states
+               if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+                       inputState.clear();
+                       cleanupTsState.clear();
+                       return;
+               }
+
+               // we consider the original timestamp of events
+               // that have registered this time trigger 1 ms ago
+
+               long currentTime = timestamp - 1;
+
+               // get the list of elements of current proctime
+               List<RowData> currentElements = inputState.get(currentTime);
+               triggerWindowProcess(timestamp, currentElements);
+       }
+
+       @Override
+       @SuppressWarnings("ConstantConditions")
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] udafResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(udafResult, 0, length);
+               int rowCount = arrowSerializer.load();
+               for (int i = 0; i < rowCount; i++) {
+                       RowData data = arrowSerializer.read(i);
+                       List<RowData> input = inputData.poll();
+                       for (RowData ele : input) {
+                               reuseJoinedRow.setRowKind(ele.getRowKind());
+                               
rowDataWrapper.collect(reuseJoinedRow.replace(ele, data));
+                       }
+               }
+       }
+
+       void registerCleanupTimer(long timestamp, TimeDomain domain) throws 
Exception {
+               long minCleanupTimestamp = timestamp + lowerBoundary + 1;
+               long maxCleanupTimestamp = timestamp + (long) (lowerBoundary * 
1.5) + 1;
+               // update timestamp and register timer if needed
+               Long curCleanupTimestamp = cleanupTsState.value();
+               if (curCleanupTimestamp == null || curCleanupTimestamp < 
minCleanupTimestamp) {
+                       // we don't delete existing timer since it may delete 
timer for data processing
+                       if (domain == TimeDomain.EVENT_TIME) {
+                               
timerService.registerEventTimeTimer(maxCleanupTimestamp);
+                       } else {
+                               
timerService.registerProcessingTimeTimer(maxCleanupTimestamp);
+                       }
+                       cleanupTsState.update(maxCleanupTimestamp);
+               }
+       }
+
+       private void triggerWindowProcess(long timestamp, List<RowData> inputs) 
throws Exception {
+               long limit = timestamp - lowerBoundary;
+               if (inputs != null) {
+                       for (long dataTs : inputState.keys()) {

Review comment:
       What about using inputState.entries to avoid reading the same state 
twice.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/rules/physical/stream/StreamExecPythonOverAggregateRule.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.rules.physical.stream;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.functions.python.PythonFunctionKind;
+import org.apache.flink.table.planner.plan.nodes.FlinkConventions;
+import 
org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalOverAggregate;
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonOverAggregate;
+import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
+import org.apache.flink.table.planner.plan.utils.PythonUtil;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.convert.ConverterRule;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.util.ImmutableBitSet;
+
+import java.util.List;
+
+/**
+ * The physical rule is responsible for converting {@link 
FlinkLogicalOverAggregate} to
+ * {@link StreamExecPythonOverAggregate}.
+ */
+public class StreamExecPythonOverAggregateRule extends ConverterRule {
+       public static final StreamExecPythonOverAggregateRule INSTANCE = new 
StreamExecPythonOverAggregateRule();
+
+       private StreamExecPythonOverAggregateRule() {
+               super(FlinkLogicalOverAggregate.class,
+                       FlinkConventions.LOGICAL(),
+                       FlinkConventions.STREAM_PHYSICAL(),
+                       "StreamExecPythonOverAggregateRule");
+       }
+
+       @Override
+       public boolean matches(RelOptRuleCall call) {
+               FlinkLogicalOverAggregate logicWindow = call.rel(0);
+               List<AggregateCall> aggCalls = 
logicWindow.groups.get(0).getAggregateCalls(logicWindow);
+
+               boolean existGeneralPythonFunction =
+                       aggCalls.stream().anyMatch(x -> 
PythonUtil.isPythonAggregate(x, PythonFunctionKind.GENERAL));
+               boolean existPandasFunction =
+                       aggCalls.stream().anyMatch(x -> 
PythonUtil.isPythonAggregate(x, PythonFunctionKind.PANDAS));
+               boolean existJavaFunction =
+                       aggCalls.stream().anyMatch(x -> 
!PythonUtil.isPythonAggregate(x, null));
+               if (existPandasFunction || existGeneralPythonFunction) {
+                       if (existGeneralPythonFunction) {
+                               throw new TableException("non-Pandas UDAFs are 
not supported in stream mode currently.");

Review comment:
       ```suggestion
                                throw new TableException("Non-Pandas Python 
UDAFs are not supported in stream mode currently.");
   ```

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} 
Operator for RANGE clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class 
AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator<K>
+       extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> 
{
+
+       private static final long serialVersionUID = 1L;
+
+       private transient LinkedList<List<RowData>> inputData;
+
+       public 
AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator(
+               Configuration config,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, pandasAggFunctions, inputType, outputType, 
inputTimeFieldIndex, lowerBoundary,
+                       groupingSet, udafInputOffsets);
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               inputData = new LinkedList<>();
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               long timestamp = timer.getTimestamp();
+               Long cleanupTimestamp = cleanupTsState.value();
+               // if cleanupTsState has not been updated then it is safe to 
cleanup states
+               if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+                       inputState.clear();
+                       lastTriggeringTsState.clear();
+                       cleanupTsState.clear();
+                       return;
+               }
+               // gets all window data from state for the calculation
+               List<RowData> inputs = inputState.get(timestamp);
+               triggerWindowProcess(timestamp, inputs);
+               lastTriggeringTsState.update(timestamp);
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               long timestamp = timer.getTimestamp();
+               Long cleanupTimestamp = cleanupTsState.value();
+               // if cleanupTsState has not been updated then it is safe to 
cleanup states
+               if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+                       inputState.clear();
+                       cleanupTsState.clear();
+                       return;
+               }
+
+               // we consider the original timestamp of events
+               // that have registered this time trigger 1 ms ago
+
+               long currentTime = timestamp - 1;
+
+               // get the list of elements of current proctime
+               List<RowData> currentElements = inputState.get(currentTime);
+               triggerWindowProcess(timestamp, currentElements);
+       }
+
+       @Override
+       @SuppressWarnings("ConstantConditions")
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] udafResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(udafResult, 0, length);
+               int rowCount = arrowSerializer.load();
+               for (int i = 0; i < rowCount; i++) {
+                       RowData data = arrowSerializer.read(i);
+                       List<RowData> input = inputData.poll();
+                       for (RowData ele : input) {
+                               reuseJoinedRow.setRowKind(ele.getRowKind());
+                               
rowDataWrapper.collect(reuseJoinedRow.replace(ele, data));
+                       }
+               }
+       }
+
+       void registerCleanupTimer(long timestamp, TimeDomain domain) throws 
Exception {
+               long minCleanupTimestamp = timestamp + lowerBoundary + 1;
+               long maxCleanupTimestamp = timestamp + (long) (lowerBoundary * 
1.5) + 1;
+               // update timestamp and register timer if needed
+               Long curCleanupTimestamp = cleanupTsState.value();
+               if (curCleanupTimestamp == null || curCleanupTimestamp < 
minCleanupTimestamp) {
+                       // we don't delete existing timer since it may delete 
timer for data processing
+                       if (domain == TimeDomain.EVENT_TIME) {
+                               
timerService.registerEventTimeTimer(maxCleanupTimestamp);
+                       } else {
+                               
timerService.registerProcessingTimeTimer(maxCleanupTimestamp);
+                       }
+                       cleanupTsState.update(maxCleanupTimestamp);
+               }
+       }
+
+       private void triggerWindowProcess(long timestamp, List<RowData> inputs) 
throws Exception {
+               long limit = timestamp - lowerBoundary;
+               if (inputs != null) {
+                       for (long dataTs : inputState.keys()) {
+                               if (dataTs >= limit && dataTs <= timestamp) {
+                                       List<RowData> dataList = 
inputState.get(dataTs);

Review comment:
       The state in *inputState* are never cleared. I think we should also 
remove outdated state in *triggerWindowProcess*.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonOverAggregate.scala
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.memory.ManagedMemoryUseCase
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonAggregate
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonOverAggregate.{
+  ARROW_PYTHON_OVER_WINDOW_RANGE_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_RANGE_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_ROWS_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_ROWS_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME
+}
+import org.apache.flink.table.planner.plan.utils.{KeySelectorUtil, 
OverAggregateUtil}
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Window.Group
+import org.apache.calcite.rel.core.{AggregateCall, Window}
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+/**
+  * Stream physical RelNode for python time-based over [[Window]].
+  */
+class StreamExecPythonOverAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    outputRowType: RelDataType,
+    inputRowType: RelDataType,
+    logicWindow: Window)
+  extends StreamExecOverAggregateBase(
+    cluster,
+    traitSet,
+    inputRel,
+    outputRowType,
+    inputRowType,
+    logicWindow)
+  with CommonPythonAggregate {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new StreamExecPythonOverAggregate(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      outputRowType,
+      inputRowType,
+      logicWindow
+      )
+  }
+
+  override protected def translateToPlanInternal(
+      planner: StreamPlanner): Transformation[RowData] = {
+    val tableConfig = planner.getTableConfig
+
+    val overWindow: Group = logicWindow.groups.get(0)
+
+    val orderKeys = overWindow.orderKeys.getFieldCollations
+
+    if (orderKeys.size() != 1) {
+      throw new TableException(
+        "The window can only be ordered by a single time column.")
+    }
+    val orderKey = orderKeys.get(0)
+
+    if (!orderKey.direction.equals(ASCENDING)) {
+      throw new TableException(
+        "The window can only be ordered in ASCENDING mode.")
+    }
+
+    val inputDS = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+
+    if (!logicWindow.groups.get(0).keys.isEmpty && 
tableConfig.getMinIdleStateRetentionTime < 0) {
+      LOG.warn(
+        "No state retention interval configured for a query which accumulates 
state. " +
+          "Please provide a query configuration with valid retention interval 
to prevent " +
+          "excessive state size. You may specify a retention time of 0 to not 
clean up the state.")
+    }
+
+    val timeType = 
outputRowType.getFieldList.get(orderKey.getFieldIndex).getType
+
+    // check time field
+    if (!FlinkTypeFactory.isRowtimeIndicatorType(timeType)
+      && !FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      throw new TableException(
+        "OVER windows' ordering in stream mode must be defined on a time 
attribute.")
+    }
+
+    // identify window rowtime attribute
+    val rowTimeIdx: Option[Int] = if 
(FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
+      Some(orderKey.getFieldIndex)
+    } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      None
+    } else {
+      throw new TableException(
+        "OVER windows can only be applied on time attributes.")
+    }
+
+    if (overWindow.lowerBound.isPreceding
+      && overWindow.lowerBound.isUnbounded) {
+      throw new TableException(
+        "OVER PRECEDING windows are not supported yet."
+      )
+    } else if (!overWindow.upperBound.isCurrentRow) {
+      throw new TableException(
+        "OVER FOLLOWING windows are not supported yet."

Review comment:
       ```suggestion
           "UNBOUNDED FOLLOWING OVER windows are not supported yet for Python 
UDAF."
   ```

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecPythonOverAggregate.scala
##########
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.planner.plan.nodes.physical.stream
+
+import org.apache.flink.api.dag.Transformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.core.memory.ManagedMemoryUseCase
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.functions.python.PythonFunctionInfo
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory
+import org.apache.flink.table.planner.delegation.StreamPlanner
+import org.apache.flink.table.planner.plan.nodes.common.CommonPythonAggregate
+import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonOverAggregate.{
+  ARROW_PYTHON_OVER_WINDOW_RANGE_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_RANGE_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_ROWS_ROW_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME,
+  ARROW_PYTHON_OVER_WINDOW_ROWS_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME
+}
+import org.apache.flink.table.planner.plan.utils.{KeySelectorUtil, 
OverAggregateUtil}
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo
+import org.apache.flink.table.types.logical.RowType
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.RelFieldCollation.Direction.ASCENDING
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.Window.Group
+import org.apache.calcite.rel.core.{AggregateCall, Window}
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+/**
+  * Stream physical RelNode for python time-based over [[Window]].
+  */
+class StreamExecPythonOverAggregate(
+    cluster: RelOptCluster,
+    traitSet: RelTraitSet,
+    inputRel: RelNode,
+    outputRowType: RelDataType,
+    inputRowType: RelDataType,
+    logicWindow: Window)
+  extends StreamExecOverAggregateBase(
+    cluster,
+    traitSet,
+    inputRel,
+    outputRowType,
+    inputRowType,
+    logicWindow)
+  with CommonPythonAggregate {
+
+  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
+    new StreamExecPythonOverAggregate(
+      cluster,
+      traitSet,
+      inputs.get(0),
+      outputRowType,
+      inputRowType,
+      logicWindow
+      )
+  }
+
+  override protected def translateToPlanInternal(
+      planner: StreamPlanner): Transformation[RowData] = {
+    val tableConfig = planner.getTableConfig
+
+    val overWindow: Group = logicWindow.groups.get(0)
+
+    val orderKeys = overWindow.orderKeys.getFieldCollations
+
+    if (orderKeys.size() != 1) {
+      throw new TableException(
+        "The window can only be ordered by a single time column.")
+    }
+    val orderKey = orderKeys.get(0)
+
+    if (!orderKey.direction.equals(ASCENDING)) {
+      throw new TableException(
+        "The window can only be ordered in ASCENDING mode.")
+    }
+
+    val inputDS = getInputNodes.get(0).translateToPlan(planner)
+      .asInstanceOf[Transformation[RowData]]
+
+    if (!logicWindow.groups.get(0).keys.isEmpty && 
tableConfig.getMinIdleStateRetentionTime < 0) {
+      LOG.warn(
+        "No state retention interval configured for a query which accumulates 
state. " +
+          "Please provide a query configuration with valid retention interval 
to prevent " +
+          "excessive state size. You may specify a retention time of 0 to not 
clean up the state.")
+    }
+
+    val timeType = 
outputRowType.getFieldList.get(orderKey.getFieldIndex).getType
+
+    // check time field
+    if (!FlinkTypeFactory.isRowtimeIndicatorType(timeType)
+      && !FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      throw new TableException(
+        "OVER windows' ordering in stream mode must be defined on a time 
attribute.")
+    }
+
+    // identify window rowtime attribute
+    val rowTimeIdx: Option[Int] = if 
(FlinkTypeFactory.isRowtimeIndicatorType(timeType)) {
+      Some(orderKey.getFieldIndex)
+    } else if (FlinkTypeFactory.isProctimeIndicatorType(timeType)) {
+      None
+    } else {
+      throw new TableException(
+        "OVER windows can only be applied on time attributes.")
+    }
+
+    if (overWindow.lowerBound.isPreceding
+      && overWindow.lowerBound.isUnbounded) {
+      throw new TableException(
+        "OVER PRECEDING windows are not supported yet."

Review comment:
       ```suggestion
           "UNBOUNDED PRECEDING OVER windows are not supported yet for Python 
UDAF."
   ```

##########
File path: flink-python/pyflink/table/tests/test_pandas_udaf.py
##########
@@ -504,6 +504,129 @@ def test_tumbling_group_window_over_count(self):
         self.assert_equals(actual, ["1,2.5", "1,6.0", "2,2.0", "3,2.5"])
         os.remove(source_path)
 
+    def test_over_range_window_aggregate_function(self):
+        # create source file path
+        import tempfile
+        import os
+        tmp_dir = tempfile.gettempdir()
+        data = [
+            '1,1,2013-01-01 03:10:00',
+            '3,2,2013-01-01 03:10:00',
+            '2,1,2013-01-01 03:10:00',
+            '1,5,2013-01-01 03:10:00',
+            '1,8,2013-01-01 04:20:00',
+            '2,3,2013-01-01 03:30:00'
+        ]
+        source_path = tmp_dir + 
'/test_over_range_window_aggregate_function.csv'
+        with open(source_path, 'w') as fd:
+            for ele in data:
+                fd.write(ele + '\n')
+        max_add_min_udaf = udaf(lambda a: a.max() + a.min(),
+                                result_type=DataTypes.SMALLINT(),
+                                func_type='pandas')
+        self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

Review comment:
       Add test cases for processing time?

##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * The Abstract class of Stream Arrow Python {@link AggregateFunction} 
Operator for RANGE clause
+ * bounded Over Window Aggregation.
+ */
+@Internal
+public abstract class 
AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator<K>
+       extends AbstractStreamArrowPythonOverWindowAggregateFunctionOperator<K> 
{
+
+       private static final long serialVersionUID = 1L;
+
+       private transient LinkedList<List<RowData>> inputData;
+
+       public 
AbstractStreamArrowPythonRangeBoundedOverWindowAggregateFunctionOperator(
+               Configuration config,
+               PythonFunctionInfo[] pandasAggFunctions,
+               RowType inputType,
+               RowType outputType,
+               int inputTimeFieldIndex,
+               long lowerBoundary,
+               int[] groupingSet,
+               int[] udafInputOffsets) {
+               super(config, pandasAggFunctions, inputType, outputType, 
inputTimeFieldIndex, lowerBoundary,
+                       groupingSet, udafInputOffsets);
+       }
+
+       @Override
+       public void open() throws Exception {
+               super.open();
+               inputData = new LinkedList<>();
+       }
+
+       @Override
+       public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws 
Exception {
+               long timestamp = timer.getTimestamp();
+               Long cleanupTimestamp = cleanupTsState.value();
+               // if cleanupTsState has not been updated then it is safe to 
cleanup states
+               if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+                       inputState.clear();
+                       lastTriggeringTsState.clear();
+                       cleanupTsState.clear();
+                       return;
+               }
+               // gets all window data from state for the calculation
+               List<RowData> inputs = inputState.get(timestamp);
+               triggerWindowProcess(timestamp, inputs);
+               lastTriggeringTsState.update(timestamp);
+       }
+
+       @Override
+       public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) 
throws Exception {
+               long timestamp = timer.getTimestamp();
+               Long cleanupTimestamp = cleanupTsState.value();
+               // if cleanupTsState has not been updated then it is safe to 
cleanup states
+               if (cleanupTimestamp != null && cleanupTimestamp <= timestamp) {
+                       inputState.clear();
+                       cleanupTsState.clear();
+                       return;
+               }
+
+               // we consider the original timestamp of events
+               // that have registered this time trigger 1 ms ago
+
+               long currentTime = timestamp - 1;
+
+               // get the list of elements of current proctime
+               List<RowData> currentElements = inputState.get(currentTime);
+               triggerWindowProcess(timestamp, currentElements);
+       }
+
+       @Override
+       @SuppressWarnings("ConstantConditions")
+       public void emitResult(Tuple2<byte[], Integer> resultTuple) throws 
Exception {
+               byte[] udafResult = resultTuple.f0;
+               int length = resultTuple.f1;
+               bais.setBuffer(udafResult, 0, length);
+               int rowCount = arrowSerializer.load();
+               for (int i = 0; i < rowCount; i++) {
+                       RowData data = arrowSerializer.read(i);
+                       List<RowData> input = inputData.poll();
+                       for (RowData ele : input) {
+                               reuseJoinedRow.setRowKind(ele.getRowKind());
+                               
rowDataWrapper.collect(reuseJoinedRow.replace(ele, data));
+                       }
+               }
+       }
+
+       void registerCleanupTimer(long timestamp, TimeDomain domain) throws 
Exception {
+               long minCleanupTimestamp = timestamp + lowerBoundary + 1;
+               long maxCleanupTimestamp = timestamp + (long) (lowerBoundary * 
1.5) + 1;
+               // update timestamp and register timer if needed
+               Long curCleanupTimestamp = cleanupTsState.value();
+               if (curCleanupTimestamp == null || curCleanupTimestamp < 
minCleanupTimestamp) {
+                       // we don't delete existing timer since it may delete 
timer for data processing
+                       if (domain == TimeDomain.EVENT_TIME) {
+                               
timerService.registerEventTimeTimer(maxCleanupTimestamp);
+                       } else {
+                               
timerService.registerProcessingTimeTimer(maxCleanupTimestamp);
+                       }
+                       cleanupTsState.update(maxCleanupTimestamp);
+               }
+       }
+
+       private void triggerWindowProcess(long timestamp, List<RowData> inputs) 
throws Exception {

Review comment:
       ```suggestion
        private void triggerWindowProcess(long upperLimit, List<RowData> 
inputs) throws Exception {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to