dianfu commented on a change in pull request #13504:
URL: https://github.com/apache/flink/pull/13504#discussion_r504406366



##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/AbstractStreamArrowPythonBoundedRowsOperator.java
##########
@@ -158,88 +161,46 @@ void registerProcessingCleanupTimer(long currentTime) 
throws Exception {
        }
 
        void triggerWindowProcess(List<RowData> inputs, int i, int index) 
throws Exception {
-               int startIndex;
-               int startPos = 0;
                if (windowData.isEmpty()) {
                        if (i >= lowerBoundary) {
                                for (int j = (int) (i - lowerBoundary); j <= i; 
j++) {
-                                       RowData rowData = inputs.get(j);
-                                       windowData.add(rowData);
-                                       
arrowSerializer.write(getFunctionInput(rowData));
+                                       windowData.add(inputs.get(j));
                                }
                                currentBatchCount += lowerBoundary;
                        } else {
+                               for (int j = 0; j <= i; j++) {
+                                       RowData rowData = inputs.get(j);
+                                       windowData.add(rowData);
+                                       currentBatchCount++;
+                               }
                                Long previousTimestamp;
-                               List<RowData> previousData = null;
-                               int length = 0;
-                               startIndex = index - 1;
+                               List<RowData> previousData;
+                               int length;
                                long remainingDataCount = lowerBoundary - i;
                                ListIterator<Long> iter = 
sortedTimestamps.listIterator(index);
                                while (remainingDataCount > 0 && 
iter.hasPrevious()) {
                                        previousTimestamp = iter.previous();
                                        previousData = 
inputState.get(previousTimestamp);
                                        length = previousData.size();
-                                       if (remainingDataCount <= length) {
-                                               startPos = (int) (length - 
remainingDataCount);
-                                               remainingDataCount = 0;
-                                       } else {
-                                               remainingDataCount -= length;
-                                               startIndex--;
-                                       }
-                               }
-                               if (previousData != null) {
-                                       for (int j = startPos; j < length; j++) 
{
-                                               RowData rowData = 
previousData.get(j);
-                                               windowData.add(rowData);
-                                               
arrowSerializer.write(getFunctionInput(rowData));
+                                       ListIterator<RowData> previousDataIter 
= previousData.listIterator(length);
+                                       while (previousDataIter.hasPrevious() 
&& remainingDataCount > 0) {
+                                               
windowData.addFirst(previousDataIter.previous());
+                                               remainingDataCount--;
                                                currentBatchCount++;
                                        }
-                                       // clear outdated data.

Review comment:
       These logic about clear outdated state are missing in the latest PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to