This is an automated email from the ASF dual-hosted git repository.

hxb pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 945c15341b9 [FLINK-27733][python] Rework on_timer output behind 
watermark bug fix
945c15341b9 is described below

commit 945c15341b93a9bfadc7b6ce239a96c2b7baf592
Author: Juntao Hu <[email protected]>
AuthorDate: Sun May 22 23:16:12 2022 +0800

    [FLINK-27733][python] Rework on_timer output behind watermark bug fix
    
    This closes #19788.
---
 .../python/AbstractPythonFunctionOperator.java     | 28 +++++++++++++-----
 .../python/PythonKeyedCoProcessOperator.java       | 33 ++++------------------
 .../python/PythonKeyedProcessOperator.java         | 33 ++++------------------
 .../operators/python/timer/TimerRegistration.java  | 16 -----------
 .../api/operators/python/timer/TimerUtils.java     | 30 --------------------
 ...thonStreamGroupWindowAggregateOperatorTest.java |  2 --
 ...onGroupWindowAggregateFunctionOperatorTest.java | 19 +++++++------
 ...ArrowPythonRowTimeBoundedRangeOperatorTest.java |  6 ++--
 ...mArrowPythonRowTimeBoundedRowsOperatorTest.java |  6 ++--
 9 files changed, 49 insertions(+), 124 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 2c04bc6ef78..98b85f6db9f 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -31,6 +31,7 @@ import org.apache.flink.python.metric.FlinkMetricContainer;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import 
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
 import 
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
 import 
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner;
@@ -218,14 +219,18 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
         // Approach 1) is the easiest and gives better latency, yet 2)
         // gives better throughput due to the bundle not getting cut on
         // every watermark. So we have implemented 2) below.
+
+        // advance the watermark and do not emit watermark to downstream 
operators
+        if (getTimeServiceManager().isPresent()) {
+            getTimeServiceManager().get().advanceWatermark(mark);
+        }
+
         if (mark.getTimestamp() == Long.MAX_VALUE) {
             invokeFinishBundle();
             processElementsOfCurrentKeyIfNeeded(null);
-            preEmitWatermark(mark);
+            advanceWatermark(mark);
             output.emitWatermark(mark);
         } else if (isBundleFinished()) {
-            // forward the watermark immediately if the bundle is already 
finished.
-            preEmitWatermark(mark);
             output.emitWatermark(mark);
         } else {
             // It is not safe to advance the output watermark yet, so add a 
hold on the current
@@ -233,8 +238,8 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
             bundleFinishedCallback =
                     () -> {
                         try {
+                            advanceWatermark(mark);
                             // at this point the bundle is finished, allow the 
watermark to pass
-                            preEmitWatermark(mark);
                             output.emitWatermark(mark);
                         } catch (Exception e) {
                             throw new RuntimeException(
@@ -318,10 +323,19 @@ public abstract class AbstractPythonFunctionOperator<OUT> 
extends AbstractStream
         }
     }
 
-    /** Called before emitting watermark to downstream. */
-    protected void preEmitWatermark(Watermark mark) throws Exception {
+    /**
+     * Advances the watermark of all managed timer services, potentially 
firing event time timers.
+     * It also ensures that the fired timers are processed in the Python 
user-defined functions.
+     */
+    private void advanceWatermark(Watermark watermark) throws Exception {
         if (getTimeServiceManager().isPresent()) {
-            getTimeServiceManager().get().advanceWatermark(mark);
+            InternalTimeServiceManager<?> timeServiceManager = 
getTimeServiceManager().get();
+            timeServiceManager.advanceWatermark(watermark);
+
+            while (!isBundleFinished()) {
+                invokeFinishBundle();
+                timeServiceManager.advanceWatermark(watermark);
+            }
         }
     }
 
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
index cff654ab428..2f2fcdf63ed 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.TimeDomain;
 import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.Triggerable;
@@ -38,7 +37,6 @@ import 
org.apache.flink.streaming.api.operators.python.timer.TimerRegistration;
 import 
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
 import org.apache.flink.streaming.api.utils.ProtoUtils;
 import org.apache.flink.streaming.api.utils.PythonTypeUtils;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Row;
 
@@ -58,9 +56,6 @@ public class PythonKeyedCoProcessOperator<OUT>
     /** TimerService for current operator to register or fire timer. */
     private transient InternalTimerService<VoidNamespace> internalTimerService;
 
-    /** TimerRegistration for handling timer registering. */
-    private transient TimerRegistration timerRegistration;
-
     /** The TypeInformation of the key. */
     private transient TypeInformation<Row> keyTypeInfo;
 
@@ -102,13 +97,6 @@ public class PythonKeyedCoProcessOperator<OUT>
                         timerDataTypeInfo);
 
         timerHandler = new TimerHandler();
-        timerRegistration =
-                new TimerRegistration(
-                        getKeyedStateBackend(),
-                        internalTimerService,
-                        this,
-                        VoidNamespaceSerializer.INSTANCE,
-                        timerDataSerializer);
 
         super.open();
     }
@@ -130,7 +118,12 @@ public class PythonKeyedCoProcessOperator<OUT>
                 getKeyedStateBackend(),
                 keyTypeSerializer,
                 null,
-                timerRegistration,
+                new TimerRegistration(
+                        getKeyedStateBackend(),
+                        internalTimerService,
+                        this,
+                        VoidNamespaceSerializer.INSTANCE,
+                        timerDataSerializer),
                 getContainingTask().getEnvironment().getMemoryManager(),
                 getOperatorConfig()
                         .getManagedMemoryFractionOperatorUseCaseOfSlot(
@@ -202,20 +195,6 @@ public class PythonKeyedCoProcessOperator<OUT>
         emitResults();
     }
 
-    @SuppressWarnings("rawtypes")
-    @Override
-    protected void preEmitWatermark(Watermark mark) throws Exception {
-        if (!getTimeServiceManager().isPresent()) {
-            return;
-        }
-        InternalTimeServiceManager timeServiceManager = 
getTimeServiceManager().get();
-        long timestamp = mark.getTimestamp();
-        do {
-            timeServiceManager.advanceWatermark(mark);
-            invokeFinishBundle();
-        } while 
(!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp));
-    }
-
     /**
      * As the beam state gRPC service will access the KeyedStateBackend in 
parallel with this
      * operator, we must override this method to prevent changing the current 
key of the
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
index 3905cd005ae..3d81c1076ba 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.TimeDomain;
 import 
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.Triggerable;
@@ -38,7 +37,6 @@ import 
org.apache.flink.streaming.api.operators.python.timer.TimerRegistration;
 import 
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
 import org.apache.flink.streaming.api.utils.ProtoUtils;
 import org.apache.flink.streaming.api.utils.PythonTypeUtils;
-import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Row;
 
@@ -65,9 +63,6 @@ public class PythonKeyedProcessOperator<OUT>
     /** TimerService for current operator to register or fire timer. */
     private transient InternalTimerService internalTimerService;
 
-    /** TimerRegistration for handling timer registering. */
-    private transient TimerRegistration timerRegistration;
-
     /** The TypeInformation of the key. */
     private transient TypeInformation<Row> keyTypeInfo;
 
@@ -122,13 +117,6 @@ public class PythonKeyedProcessOperator<OUT>
                         timerDataTypeInfo);
 
         timerHandler = new TimerHandler();
-        timerRegistration =
-                new TimerRegistration(
-                        getKeyedStateBackend(),
-                        internalTimerService,
-                        this,
-                        namespaceSerializer,
-                        timerDataSerializer);
 
         super.open();
     }
@@ -160,7 +148,12 @@ public class PythonKeyedProcessOperator<OUT>
                 getKeyedStateBackend(),
                 keyTypeSerializer,
                 namespaceSerializer,
-                timerRegistration,
+                new TimerRegistration(
+                        getKeyedStateBackend(),
+                        internalTimerService,
+                        this,
+                        namespaceSerializer,
+                        timerDataSerializer),
                 getContainingTask().getEnvironment().getMemoryManager(),
                 getOperatorConfig()
                         .getManagedMemoryFractionOperatorUseCaseOfSlot(
@@ -221,20 +214,6 @@ public class PythonKeyedProcessOperator<OUT>
         emitResults();
     }
 
-    @SuppressWarnings("rawtypes")
-    @Override
-    protected void preEmitWatermark(Watermark mark) throws Exception {
-        if (!getTimeServiceManager().isPresent()) {
-            return;
-        }
-        InternalTimeServiceManager timeServiceManager = 
getTimeServiceManager().get();
-        long timestamp = mark.getTimestamp();
-        do {
-            timeServiceManager.advanceWatermark(mark);
-            invokeFinishBundle();
-        } while 
(!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp));
-    }
-
     /**
      * As the beam state gRPC service will access the KeyedStateBackend in 
parallel with this
      * operator, we must override this method to prevent changing the current 
key of the
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
index 686025eadb1..1a7371cc723 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
@@ -22,13 +22,11 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.InternalPriorityQueue;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.KeyContext;
-import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
 import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
 import org.apache.flink.types.Row;
 
@@ -41,7 +39,6 @@ public final class TimerRegistration {
 
     private final KeyedStateBackend<Row> keyedStateBackend;
     private final InternalTimerService internalTimerService;
-    private final InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> 
internalEventTimeTimersQueue;
     private final KeyContext keyContext;
     private final TypeSerializer namespaceSerializer;
     private final TypeSerializer<Row> timerDataSerializer;
@@ -57,8 +54,6 @@ public final class TimerRegistration {
             throws Exception {
         this.keyedStateBackend = keyedStateBackend;
         this.internalTimerService = internalTimerService;
-        this.internalEventTimeTimersQueue =
-                
TimerUtils.getInternalEventTimeTimersQueue(internalTimerService);
         this.keyContext = keyContext;
         this.namespaceSerializer = namespaceSerializer;
         this.timerDataSerializer = timerDataSerializer;
@@ -111,17 +106,6 @@ public final class TimerRegistration {
         }
     }
 
-    /**
-     * Returns if there's any event-time timer in the queue, that should be 
triggered because
-     * watermark advance.
-     */
-    public boolean hasEventTimeTimerBeforeTimestamp(long timestamp) throws 
Exception {
-        return TimerUtils.hasEventTimeTimerBeforeTimestamp(
-                internalEventTimeTimersQueue,
-                timestamp,
-                PythonOperatorUtils.inBatchExecutionMode(keyedStateBackend));
-    }
-
     /** The flag for indicating the timer operation type. */
     private enum TimerOperandType {
         REGISTER_EVENT_TIMER((byte) 0),
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
index 4c9abf97ca1..aefd1834a24 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
@@ -22,14 +22,8 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.runtime.state.InternalPriorityQueue;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
 import org.apache.flink.streaming.api.utils.ProtoUtils;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import java.lang.reflect.Field;
 
 /** Utilities for timer. */
 @Internal
@@ -48,28 +42,4 @@ public final class TimerUtils {
         return ProtoUtils.createRawTypeCoderInfoDescriptorProto(
                 timerDataType, FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE, 
false);
     }
-
-    @SuppressWarnings("unchecked")
-    public static InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>
-            getInternalEventTimeTimersQueue(InternalTimerService<?> 
internalTimerService)
-                    throws Exception {
-        Field queueField = 
internalTimerService.getClass().getDeclaredField("eventTimeTimersQueue");
-        queueField.setAccessible(true);
-        return (InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>)
-                queueField.get(internalTimerService);
-    }
-
-    public static boolean hasEventTimeTimerBeforeTimestamp(
-            InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> timerQueue,
-            long timestamp,
-            boolean isBatchMode)
-            throws Exception {
-        if (isBatchMode) {
-            Preconditions.checkArgument(timestamp == Long.MAX_VALUE);
-            return timerQueue.size() == 0;
-        }
-
-        TimerHeapInternalTimer<?, ?> minTimer = timerQueue.peek();
-        return minTimer == null || minTimer.getTimestamp() > timestamp;
-    }
 }
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
index feb85844217..16734ed6c37 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
@@ -213,8 +213,6 @@ public class PythonStreamGroupWindowAggregateOperatorTest
         testHarness.processElement(newRecord(true, initialTime + 3, "c1", 
"c6", 2L, 10000L));
         testHarness.processElement(newRecord(true, initialTime + 4, "c2", 
"c8", 3L, 0L));
         testHarness.processWatermark(new Watermark(20000L));
-        assertOutputEquals(
-                "FinishBundle should not be triggered.", expectedOutput, 
testHarness.getOutput());
 
         testHarness.setProcessingTime(1000L);
         expectedOutput.add(newWindowRecord(-5000L, 5000L, "c1", 0L));
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
index 6031c7f9441..2dc8a625311 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
@@ -91,8 +91,6 @@ public class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
         testHarness.processWatermark(Long.MAX_VALUE);
         testHarness.close();
 
-        expectedOutput.add(new Watermark(Long.MAX_VALUE));
-
         expectedOutput.add(
                 new StreamRecord<>(
                         newRow(
@@ -146,6 +144,8 @@ public class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
                                 TimestampData.fromEpochMillis(10000L),
                                 TimestampData.fromEpochMillis(20000L))));
 
+        expectedOutput.add(new Watermark(Long.MAX_VALUE));
+
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
     }
 
@@ -172,8 +172,6 @@ public class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
         // checkpoint trigger finishBundle
         testHarness.prepareSnapshotPreBarrier(0L);
 
-        expectedOutput.add(new Watermark(10000L));
-
         expectedOutput.add(
                 new StreamRecord<>(
                         newRow(
@@ -210,14 +208,14 @@ public class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
                                 TimestampData.fromEpochMillis(0L),
                                 TimestampData.fromEpochMillis(10000L))));
 
+        expectedOutput.add(new Watermark(10000L));
+
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
         testHarness.processWatermark(20000L);
 
         testHarness.close();
 
-        expectedOutput.add(new Watermark(20000L));
-
         expectedOutput.add(
                 new StreamRecord<>(
                         newRow(
@@ -235,6 +233,8 @@ public class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
                                 TimestampData.fromEpochMillis(10000L),
                                 TimestampData.fromEpochMillis(20000L))));
 
+        expectedOutput.add(new Watermark(20000L));
+
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
     }
 
@@ -301,8 +301,6 @@ public class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
         testHarness.processWatermark(20000L);
         testHarness.close();
 
-        expectedOutput.add(new Watermark(20000L));
-
         expectedOutput.add(
                 new StreamRecord<>(
                         newRow(
@@ -320,6 +318,8 @@ public class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
                                 TimestampData.fromEpochMillis(10000L),
                                 TimestampData.fromEpochMillis(20000L))));
 
+        expectedOutput.add(new Watermark(20000L));
+
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
     }
 
@@ -344,7 +344,6 @@ public class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
         testHarness.processElement(
                 new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 0L), 
initialTime + 4));
         testHarness.processWatermark(new Watermark(20000L));
-        expectedOutput.add(new Watermark(20000L));
         assertOutputEquals(
                 "FinishBundle should not be triggered.", expectedOutput, 
testHarness.getOutput());
 
@@ -401,6 +400,8 @@ public class 
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
                                 TimestampData.fromEpochMillis(10000L),
                                 TimestampData.fromEpochMillis(20000L))));
 
+        expectedOutput.add(new Watermark(20000L));
+
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
         testHarness.close();
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
index 7dce45700d6..0db2d8e58c7 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
@@ -80,11 +80,11 @@ public class 
StreamArrowPythonRowTimeBoundedRangeOperatorTest
 
         testHarness.close();
 
-        expectedOutput.add(new Watermark(Long.MAX_VALUE));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 
3L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 
10L, 2L)));
+        expectedOutput.add(new Watermark(Long.MAX_VALUE));
 
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
     }
@@ -110,7 +110,6 @@ public class 
StreamArrowPythonRowTimeBoundedRangeOperatorTest
                 new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), 
initialTime + 3));
         testHarness.processWatermark(new Watermark(10000L));
 
-        expectedOutput.add(new Watermark(10000L));
         assertOutputEquals(
                 "FinishBundle should not be triggered.", expectedOutput, 
testHarness.getOutput());
         // checkpoint trigger finishBundle
@@ -120,6 +119,7 @@ public class 
StreamArrowPythonRowTimeBoundedRangeOperatorTest
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 
3L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 
10L, 2L)));
+        expectedOutput.add(new Watermark(10000L));
 
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
@@ -182,7 +182,6 @@ public class 
StreamArrowPythonRowTimeBoundedRangeOperatorTest
         testHarness.processElement(
                 new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), 
initialTime + 3));
         testHarness.processWatermark(new Watermark(10000L));
-        expectedOutput.add(new Watermark(10000L));
         assertOutputEquals(
                 "FinishBundle should not be triggered.", expectedOutput, 
testHarness.getOutput());
 
@@ -191,6 +190,7 @@ public class 
StreamArrowPythonRowTimeBoundedRangeOperatorTest
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 
3L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 
10L, 2L)));
+        expectedOutput.add(new Watermark(10000L));
 
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
diff --git 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
index 26208053867..9385b1138e6 100644
--- 
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
+++ 
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
@@ -76,11 +76,11 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
 
         testHarness.close();
 
-        expectedOutput.add(new Watermark(Long.MAX_VALUE));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 
3L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 
10L, 1L)));
+        expectedOutput.add(new Watermark(Long.MAX_VALUE));
 
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
     }
@@ -106,7 +106,6 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
                 new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), 
initialTime + 3));
         testHarness.processWatermark(new Watermark(10000L));
 
-        expectedOutput.add(new Watermark(10000L));
         assertOutputEquals(
                 "FinishBundle should not be triggered.", expectedOutput, 
testHarness.getOutput());
         // checkpoint trigger finishBundle
@@ -116,6 +115,7 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 
3L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 
10L, 1L)));
+        expectedOutput.add(new Watermark(10000L));
 
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
 
@@ -178,7 +178,6 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
         testHarness.processElement(
                 new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L), 
initialTime + 3));
         testHarness.processWatermark(new Watermark(10000L));
-        expectedOutput.add(new Watermark(10000L));
         assertOutputEquals(
                 "FinishBundle should not be triggered.", expectedOutput, 
testHarness.getOutput());
 
@@ -187,6 +186,7 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L, 
0L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L, 
3L)));
         expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L, 
10L, 1L)));
+        expectedOutput.add(new Watermark(10000L));
 
         assertOutputEquals("Output was not correct.", expectedOutput, 
testHarness.getOutput());
 

Reply via email to