This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a0ef9eb46ad [FLINK-27733][python] Rework on_timer output behind
watermark bug fix
a0ef9eb46ad is described below
commit a0ef9eb46ad3896d6d87595dbe364f69d583794c
Author: Juntao Hu <[email protected]>
AuthorDate: Sun May 22 23:16:12 2022 +0800
[FLINK-27733][python] Rework on_timer output behind watermark bug fix
This closes #19788.
---
.../python/AbstractPythonFunctionOperator.java | 28 +++++++++++++-----
.../python/PythonKeyedCoProcessOperator.java | 33 ++++------------------
.../python/PythonKeyedProcessOperator.java | 33 ++++------------------
.../operators/python/timer/TimerRegistration.java | 16 -----------
.../api/operators/python/timer/TimerUtils.java | 30 --------------------
...thonStreamGroupWindowAggregateOperatorTest.java | 2 --
...onGroupWindowAggregateFunctionOperatorTest.java | 19 +++++++------
...ArrowPythonRowTimeBoundedRangeOperatorTest.java | 6 ++--
...mArrowPythonRowTimeBoundedRowsOperatorTest.java | 6 ++--
9 files changed, 49 insertions(+), 124 deletions(-)
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index f229ea7023c..5324df04f8a 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
@@ -180,14 +181,18 @@ public abstract class AbstractPythonFunctionOperator<OUT>
extends AbstractStream
// Approach 1) is the easiest and gives better latency, yet 2)
// gives better throughput due to the bundle not getting cut on
// every watermark. So we have implemented 2) below.
+
+ // advance the watermark and do not emit watermark to downstream
operators
+ if (getTimeServiceManager().isPresent()) {
+ getTimeServiceManager().get().advanceWatermark(mark);
+ }
+
if (mark.getTimestamp() == Long.MAX_VALUE) {
invokeFinishBundle();
processElementsOfCurrentKeyIfNeeded(null);
- preEmitWatermark(mark);
+ advanceWatermark(mark);
output.emitWatermark(mark);
} else if (isBundleFinished()) {
- // forward the watermark immediately if the bundle is already
finished.
- preEmitWatermark(mark);
output.emitWatermark(mark);
} else {
// It is not safe to advance the output watermark yet, so add a
hold on the current
@@ -195,8 +200,8 @@ public abstract class AbstractPythonFunctionOperator<OUT>
extends AbstractStream
bundleFinishedCallback =
() -> {
try {
+ advanceWatermark(mark);
// at this point the bundle is finished, allow the
watermark to pass
- preEmitWatermark(mark);
output.emitWatermark(mark);
} catch (Exception e) {
throw new RuntimeException(
@@ -263,10 +268,19 @@ public abstract class AbstractPythonFunctionOperator<OUT>
extends AbstractStream
protected abstract PythonEnvironmentManager
createPythonEnvironmentManager();
- /** Called before emitting watermark to downstream. */
- protected void preEmitWatermark(Watermark mark) throws Exception {
+ /**
+ * Advances the watermark of all managed timer services, potentially
firing event time timers.
+ * It also ensures that the fired timers are processed in the Python
user-defined functions.
+ */
+ private void advanceWatermark(Watermark watermark) throws Exception {
if (getTimeServiceManager().isPresent()) {
- getTimeServiceManager().get().advanceWatermark(mark);
+ InternalTimeServiceManager<?> timeServiceManager =
getTimeServiceManager().get();
+ timeServiceManager.advanceWatermark(watermark);
+
+ while (!isBundleFinished()) {
+ invokeFinishBundle();
+ timeServiceManager.advanceWatermark(watermark);
+ }
}
}
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
index f3f65de3759..f4c317feed7 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.TimeDomain;
import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
@@ -38,7 +37,6 @@ import
org.apache.flink.streaming.api.operators.python.timer.TimerRegistration;
import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;
@@ -63,9 +61,6 @@ public class PythonKeyedCoProcessOperator<OUT>
/** TimerService for current operator to register or fire timer. */
private transient InternalTimerService<VoidNamespace> internalTimerService;
- /** TimerRegistration for handling timer registering. */
- private transient TimerRegistration timerRegistration;
-
/** The TypeInformation of the key. */
private transient TypeInformation<Row> keyTypeInfo;
@@ -107,13 +102,6 @@ public class PythonKeyedCoProcessOperator<OUT>
timerDataTypeInfo);
timerHandler = new TimerHandler();
- timerRegistration =
- new TimerRegistration(
- getKeyedStateBackend(),
- internalTimerService,
- this,
- VoidNamespaceSerializer.INSTANCE,
- timerDataSerializer);
super.open();
}
@@ -141,7 +129,12 @@ public class PythonKeyedCoProcessOperator<OUT>
getOperatorStateBackend(),
keyTypeSerializer,
null,
- timerRegistration,
+ new TimerRegistration(
+ getKeyedStateBackend(),
+ internalTimerService,
+ this,
+ VoidNamespaceSerializer.INSTANCE,
+ timerDataSerializer),
getContainingTask().getEnvironment().getMemoryManager(),
getOperatorConfig()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
@@ -214,20 +207,6 @@ public class PythonKeyedCoProcessOperator<OUT>
emitResults();
}
- @SuppressWarnings("rawtypes")
- @Override
- protected void preEmitWatermark(Watermark mark) throws Exception {
- if (!getTimeServiceManager().isPresent()) {
- return;
- }
- InternalTimeServiceManager timeServiceManager =
getTimeServiceManager().get();
- long timestamp = mark.getTimestamp();
- do {
- timeServiceManager.advanceWatermark(mark);
- invokeFinishBundle();
- } while
(!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp));
- }
-
/**
* As the beam state gRPC service will access the KeyedStateBackend in
parallel with this
* operator, we must override this method to prevent changing the current
key of the
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
index fdc870d75c7..3bcf1c382e4 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.TimeDomain;
import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
@@ -38,7 +37,6 @@ import
org.apache.flink.streaming.api.operators.python.timer.TimerRegistration;
import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;
@@ -70,9 +68,6 @@ public class PythonKeyedProcessOperator<OUT>
/** TimerService for current operator to register or fire timer. */
private transient InternalTimerService internalTimerService;
- /** TimerRegistration for handling timer registering. */
- private transient TimerRegistration timerRegistration;
-
/** The TypeInformation of the key. */
private transient TypeInformation<Row> keyTypeInfo;
@@ -127,13 +122,6 @@ public class PythonKeyedProcessOperator<OUT>
timerDataTypeInfo);
timerHandler = new TimerHandler();
- timerRegistration =
- new TimerRegistration(
- getKeyedStateBackend(),
- internalTimerService,
- this,
- namespaceSerializer,
- timerDataSerializer);
super.open();
}
@@ -171,7 +159,12 @@ public class PythonKeyedProcessOperator<OUT>
getOperatorStateBackend(),
keyTypeSerializer,
namespaceSerializer,
- timerRegistration,
+ new TimerRegistration(
+ getKeyedStateBackend(),
+ internalTimerService,
+ this,
+ namespaceSerializer,
+ timerDataSerializer),
getContainingTask().getEnvironment().getMemoryManager(),
getOperatorConfig()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
@@ -233,20 +226,6 @@ public class PythonKeyedProcessOperator<OUT>
emitResults();
}
- @SuppressWarnings("rawtypes")
- @Override
- protected void preEmitWatermark(Watermark mark) throws Exception {
- if (!getTimeServiceManager().isPresent()) {
- return;
- }
- InternalTimeServiceManager timeServiceManager =
getTimeServiceManager().get();
- long timestamp = mark.getTimestamp();
- do {
- timeServiceManager.advanceWatermark(mark);
- invokeFinishBundle();
- } while
(!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp));
- }
-
/**
* As the beam state gRPC service will access the KeyedStateBackend in
parallel with this
* operator, we must override this method to prevent changing the current
key of the
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
index 686025eadb1..1a7371cc723 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
@@ -22,13 +22,11 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
-import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.types.Row;
@@ -41,7 +39,6 @@ public final class TimerRegistration {
private final KeyedStateBackend<Row> keyedStateBackend;
private final InternalTimerService internalTimerService;
- private final InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>
internalEventTimeTimersQueue;
private final KeyContext keyContext;
private final TypeSerializer namespaceSerializer;
private final TypeSerializer<Row> timerDataSerializer;
@@ -57,8 +54,6 @@ public final class TimerRegistration {
throws Exception {
this.keyedStateBackend = keyedStateBackend;
this.internalTimerService = internalTimerService;
- this.internalEventTimeTimersQueue =
-
TimerUtils.getInternalEventTimeTimersQueue(internalTimerService);
this.keyContext = keyContext;
this.namespaceSerializer = namespaceSerializer;
this.timerDataSerializer = timerDataSerializer;
@@ -111,17 +106,6 @@ public final class TimerRegistration {
}
}
- /**
- * Returns if there's any event-time timer in the queue, that should be
triggered because
- * watermark advance.
- */
- public boolean hasEventTimeTimerBeforeTimestamp(long timestamp) throws
Exception {
- return TimerUtils.hasEventTimeTimerBeforeTimestamp(
- internalEventTimeTimersQueue,
- timestamp,
- PythonOperatorUtils.inBatchExecutionMode(keyedStateBackend));
- }
-
/** The flag for indicating the timer operation type. */
private enum TimerOperandType {
REGISTER_EVENT_TIMER((byte) 0),
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
index 4c9abf97ca1..aefd1834a24 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
@@ -22,14 +22,8 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.runtime.state.InternalPriorityQueue;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import java.lang.reflect.Field;
/** Utilities for timer. */
@Internal
@@ -48,28 +42,4 @@ public final class TimerUtils {
return ProtoUtils.createRawTypeCoderInfoDescriptorProto(
timerDataType, FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE,
false);
}
-
- @SuppressWarnings("unchecked")
- public static InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>
- getInternalEventTimeTimersQueue(InternalTimerService<?>
internalTimerService)
- throws Exception {
- Field queueField =
internalTimerService.getClass().getDeclaredField("eventTimeTimersQueue");
- queueField.setAccessible(true);
- return (InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>)
- queueField.get(internalTimerService);
- }
-
- public static boolean hasEventTimeTimerBeforeTimestamp(
- InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> timerQueue,
- long timestamp,
- boolean isBatchMode)
- throws Exception {
- if (isBatchMode) {
- Preconditions.checkArgument(timestamp == Long.MAX_VALUE);
- return timerQueue.size() == 0;
- }
-
- TimerHeapInternalTimer<?, ?> minTimer = timerQueue.peek();
- return minTimer == null || minTimer.getTimestamp() > timestamp;
- }
}
diff --git
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
index 7aec55138bd..8b36e353adf 100644
---
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
+++
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
@@ -207,8 +207,6 @@ class PythonStreamGroupWindowAggregateOperatorTest
testHarness.processElement(newRecord(true, initialTime + 3, "c1",
"c6", 2L, 10000L));
testHarness.processElement(newRecord(true, initialTime + 4, "c2",
"c8", 3L, 0L));
testHarness.processWatermark(new Watermark(20000L));
- assertOutputEquals(
- "FinishBundle should not be triggered.", expectedOutput,
testHarness.getOutput());
testHarness.setProcessingTime(1000L);
expectedOutput.add(newWindowRecord(-5000L, 5000L, "c1", 0L));
diff --git
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
index 8d44696d3c8..d1fcad02ee4 100644
---
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
+++
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
@@ -94,8 +94,6 @@ class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
testHarness.processWatermark(Long.MAX_VALUE);
testHarness.close();
- expectedOutput.add(new Watermark(Long.MAX_VALUE));
-
expectedOutput.add(
new StreamRecord<>(
newRow(
@@ -149,6 +147,8 @@ class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
TimestampData.fromEpochMillis(10000L),
TimestampData.fromEpochMillis(20000L))));
+ expectedOutput.add(new Watermark(Long.MAX_VALUE));
+
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
}
@@ -175,8 +175,6 @@ class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
// checkpoint trigger finishBundle
testHarness.prepareSnapshotPreBarrier(0L);
- expectedOutput.add(new Watermark(10000L));
-
expectedOutput.add(
new StreamRecord<>(
newRow(
@@ -213,14 +211,14 @@ class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
TimestampData.fromEpochMillis(0L),
TimestampData.fromEpochMillis(10000L))));
+ expectedOutput.add(new Watermark(10000L));
+
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
testHarness.processWatermark(20000L);
testHarness.close();
- expectedOutput.add(new Watermark(20000L));
-
expectedOutput.add(
new StreamRecord<>(
newRow(
@@ -238,6 +236,8 @@ class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
TimestampData.fromEpochMillis(10000L),
TimestampData.fromEpochMillis(20000L))));
+ expectedOutput.add(new Watermark(20000L));
+
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
}
@@ -304,8 +304,6 @@ class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
testHarness.processWatermark(20000L);
testHarness.close();
- expectedOutput.add(new Watermark(20000L));
-
expectedOutput.add(
new StreamRecord<>(
newRow(
@@ -323,6 +321,8 @@ class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
TimestampData.fromEpochMillis(10000L),
TimestampData.fromEpochMillis(20000L))));
+ expectedOutput.add(new Watermark(20000L));
+
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
}
@@ -347,7 +347,6 @@ class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
testHarness.processElement(
new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 0L),
initialTime + 4));
testHarness.processWatermark(new Watermark(20000L));
- expectedOutput.add(new Watermark(20000L));
assertOutputEquals(
"FinishBundle should not be triggered.", expectedOutput,
testHarness.getOutput());
@@ -404,6 +403,8 @@ class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
TimestampData.fromEpochMillis(10000L),
TimestampData.fromEpochMillis(20000L))));
+ expectedOutput.add(new Watermark(20000L));
+
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
testHarness.close();
diff --git
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
index 2452222586a..0f489dccffb 100644
---
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
+++
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
@@ -83,11 +83,11 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest
testHarness.close();
- expectedOutput.add(new Watermark(Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L,
3L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L,
10L, 2L)));
+ expectedOutput.add(new Watermark(Long.MAX_VALUE));
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
}
@@ -113,7 +113,6 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest
new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L),
initialTime + 3));
testHarness.processWatermark(new Watermark(10000L));
- expectedOutput.add(new Watermark(10000L));
assertOutputEquals(
"FinishBundle should not be triggered.", expectedOutput,
testHarness.getOutput());
// checkpoint trigger finishBundle
@@ -123,6 +122,7 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L,
3L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L,
10L, 2L)));
+ expectedOutput.add(new Watermark(10000L));
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
@@ -185,7 +185,6 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest
testHarness.processElement(
new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L),
initialTime + 3));
testHarness.processWatermark(new Watermark(10000L));
- expectedOutput.add(new Watermark(10000L));
assertOutputEquals(
"FinishBundle should not be triggered.", expectedOutput,
testHarness.getOutput());
@@ -194,6 +193,7 @@ class StreamArrowPythonRowTimeBoundedRangeOperatorTest
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L,
3L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L,
10L, 2L)));
+ expectedOutput.add(new Watermark(10000L));
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
diff --git
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
index b27e9206103..0c200842910 100644
---
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
+++
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
@@ -79,11 +79,11 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest
testHarness.close();
- expectedOutput.add(new Watermark(Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L,
3L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L,
10L, 1L)));
+ expectedOutput.add(new Watermark(Long.MAX_VALUE));
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
}
@@ -109,7 +109,6 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest
new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L),
initialTime + 3));
testHarness.processWatermark(new Watermark(10000L));
- expectedOutput.add(new Watermark(10000L));
assertOutputEquals(
"FinishBundle should not be triggered.", expectedOutput,
testHarness.getOutput());
// checkpoint trigger finishBundle
@@ -119,6 +118,7 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L,
3L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L,
10L, 1L)));
+ expectedOutput.add(new Watermark(10000L));
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
@@ -181,7 +181,6 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest
testHarness.processElement(
new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L),
initialTime + 3));
testHarness.processWatermark(new Watermark(10000L));
- expectedOutput.add(new Watermark(10000L));
assertOutputEquals(
"FinishBundle should not be triggered.", expectedOutput,
testHarness.getOutput());
@@ -190,6 +189,7 @@ class StreamArrowPythonRowTimeBoundedRowsOperatorTest
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L,
3L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L,
10L, 1L)));
+ expectedOutput.add(new Watermark(10000L));
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());