This is an automated email from the ASF dual-hosted git repository.
hxb pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 945c15341b9 [FLINK-27733][python] Rework on_timer output behind
watermark bug fix
945c15341b9 is described below
commit 945c15341b93a9bfadc7b6ce239a96c2b7baf592
Author: Juntao Hu <[email protected]>
AuthorDate: Sun May 22 23:16:12 2022 +0800
[FLINK-27733][python] Rework on_timer output behind watermark bug fix
This closes #19788.
---
.../python/AbstractPythonFunctionOperator.java | 28 +++++++++++++-----
.../python/PythonKeyedCoProcessOperator.java | 33 ++++------------------
.../python/PythonKeyedProcessOperator.java | 33 ++++------------------
.../operators/python/timer/TimerRegistration.java | 16 -----------
.../api/operators/python/timer/TimerUtils.java | 30 --------------------
...thonStreamGroupWindowAggregateOperatorTest.java | 2 --
...onGroupWindowAggregateFunctionOperatorTest.java | 19 +++++++------
...ArrowPythonRowTimeBoundedRangeOperatorTest.java | 6 ++--
...mArrowPythonRowTimeBoundedRowsOperatorTest.java | 6 ++--
9 files changed, 49 insertions(+), 124 deletions(-)
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
index 2c04bc6ef78..98b85f6db9f 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/AbstractPythonFunctionOperator.java
@@ -31,6 +31,7 @@ import org.apache.flink.python.metric.FlinkMetricContainer;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import
org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionKeyedStateBackend;
import
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner;
@@ -218,14 +219,18 @@ public abstract class AbstractPythonFunctionOperator<OUT>
extends AbstractStream
// Approach 1) is the easiest and gives better latency, yet 2)
// gives better throughput due to the bundle not getting cut on
// every watermark. So we have implemented 2) below.
+
+ // advance the watermark and do not emit watermark to downstream
operators
+ if (getTimeServiceManager().isPresent()) {
+ getTimeServiceManager().get().advanceWatermark(mark);
+ }
+
if (mark.getTimestamp() == Long.MAX_VALUE) {
invokeFinishBundle();
processElementsOfCurrentKeyIfNeeded(null);
- preEmitWatermark(mark);
+ advanceWatermark(mark);
output.emitWatermark(mark);
} else if (isBundleFinished()) {
- // forward the watermark immediately if the bundle is already
finished.
- preEmitWatermark(mark);
output.emitWatermark(mark);
} else {
// It is not safe to advance the output watermark yet, so add a
hold on the current
@@ -233,8 +238,8 @@ public abstract class AbstractPythonFunctionOperator<OUT>
extends AbstractStream
bundleFinishedCallback =
() -> {
try {
+ advanceWatermark(mark);
// at this point the bundle is finished, allow the
watermark to pass
- preEmitWatermark(mark);
output.emitWatermark(mark);
} catch (Exception e) {
throw new RuntimeException(
@@ -318,10 +323,19 @@ public abstract class AbstractPythonFunctionOperator<OUT>
extends AbstractStream
}
}
- /** Called before emitting watermark to downstream. */
- protected void preEmitWatermark(Watermark mark) throws Exception {
+ /**
+ * Advances the watermark of all managed timer services, potentially
firing event time timers.
+ * It also ensures that the fired timers are processed in the Python
user-defined functions.
+ */
+ private void advanceWatermark(Watermark watermark) throws Exception {
if (getTimeServiceManager().isPresent()) {
- getTimeServiceManager().get().advanceWatermark(mark);
+ InternalTimeServiceManager<?> timeServiceManager =
getTimeServiceManager().get();
+ timeServiceManager.advanceWatermark(watermark);
+
+ while (!isBundleFinished()) {
+ invokeFinishBundle();
+ timeServiceManager.advanceWatermark(watermark);
+ }
}
}
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
index cff654ab428..2f2fcdf63ed 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedCoProcessOperator.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.TimeDomain;
import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
@@ -38,7 +37,6 @@ import
org.apache.flink.streaming.api.operators.python.timer.TimerRegistration;
import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;
@@ -58,9 +56,6 @@ public class PythonKeyedCoProcessOperator<OUT>
/** TimerService for current operator to register or fire timer. */
private transient InternalTimerService<VoidNamespace> internalTimerService;
- /** TimerRegistration for handling timer registering. */
- private transient TimerRegistration timerRegistration;
-
/** The TypeInformation of the key. */
private transient TypeInformation<Row> keyTypeInfo;
@@ -102,13 +97,6 @@ public class PythonKeyedCoProcessOperator<OUT>
timerDataTypeInfo);
timerHandler = new TimerHandler();
- timerRegistration =
- new TimerRegistration(
- getKeyedStateBackend(),
- internalTimerService,
- this,
- VoidNamespaceSerializer.INSTANCE,
- timerDataSerializer);
super.open();
}
@@ -130,7 +118,12 @@ public class PythonKeyedCoProcessOperator<OUT>
getKeyedStateBackend(),
keyTypeSerializer,
null,
- timerRegistration,
+ new TimerRegistration(
+ getKeyedStateBackend(),
+ internalTimerService,
+ this,
+ VoidNamespaceSerializer.INSTANCE,
+ timerDataSerializer),
getContainingTask().getEnvironment().getMemoryManager(),
getOperatorConfig()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
@@ -202,20 +195,6 @@ public class PythonKeyedCoProcessOperator<OUT>
emitResults();
}
- @SuppressWarnings("rawtypes")
- @Override
- protected void preEmitWatermark(Watermark mark) throws Exception {
- if (!getTimeServiceManager().isPresent()) {
- return;
- }
- InternalTimeServiceManager timeServiceManager =
getTimeServiceManager().get();
- long timestamp = mark.getTimestamp();
- do {
- timeServiceManager.advanceWatermark(mark);
- invokeFinishBundle();
- } while
(!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp));
- }
-
/**
* As the beam state gRPC service will access the KeyedStateBackend in
parallel with this
* operator, we must override this method to prevent changing the current
key of the
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
index 3905cd005ae..3d81c1076ba 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/PythonKeyedProcessOperator.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.TimeDomain;
import
org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
-import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
@@ -38,7 +37,6 @@ import
org.apache.flink.streaming.api.operators.python.timer.TimerRegistration;
import
org.apache.flink.streaming.api.runners.python.beam.BeamDataStreamPythonFunctionRunner;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Row;
@@ -65,9 +63,6 @@ public class PythonKeyedProcessOperator<OUT>
/** TimerService for current operator to register or fire timer. */
private transient InternalTimerService internalTimerService;
- /** TimerRegistration for handling timer registering. */
- private transient TimerRegistration timerRegistration;
-
/** The TypeInformation of the key. */
private transient TypeInformation<Row> keyTypeInfo;
@@ -122,13 +117,6 @@ public class PythonKeyedProcessOperator<OUT>
timerDataTypeInfo);
timerHandler = new TimerHandler();
- timerRegistration =
- new TimerRegistration(
- getKeyedStateBackend(),
- internalTimerService,
- this,
- namespaceSerializer,
- timerDataSerializer);
super.open();
}
@@ -160,7 +148,12 @@ public class PythonKeyedProcessOperator<OUT>
getKeyedStateBackend(),
keyTypeSerializer,
namespaceSerializer,
- timerRegistration,
+ new TimerRegistration(
+ getKeyedStateBackend(),
+ internalTimerService,
+ this,
+ namespaceSerializer,
+ timerDataSerializer),
getContainingTask().getEnvironment().getMemoryManager(),
getOperatorConfig()
.getManagedMemoryFractionOperatorUseCaseOfSlot(
@@ -221,20 +214,6 @@ public class PythonKeyedProcessOperator<OUT>
emitResults();
}
- @SuppressWarnings("rawtypes")
- @Override
- protected void preEmitWatermark(Watermark mark) throws Exception {
- if (!getTimeServiceManager().isPresent()) {
- return;
- }
- InternalTimeServiceManager timeServiceManager =
getTimeServiceManager().get();
- long timestamp = mark.getTimestamp();
- do {
- timeServiceManager.advanceWatermark(mark);
- invokeFinishBundle();
- } while
(!timerRegistration.hasEventTimeTimerBeforeTimestamp(timestamp));
- }
-
/**
* As the beam state gRPC service will access the KeyedStateBackend in
parallel with this
* operator, we must override this method to prevent changing the current
key of the
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
index 686025eadb1..1a7371cc723 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerRegistration.java
@@ -22,13 +22,11 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.runtime.state.InternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
-import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.types.Row;
@@ -41,7 +39,6 @@ public final class TimerRegistration {
private final KeyedStateBackend<Row> keyedStateBackend;
private final InternalTimerService internalTimerService;
- private final InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>
internalEventTimeTimersQueue;
private final KeyContext keyContext;
private final TypeSerializer namespaceSerializer;
private final TypeSerializer<Row> timerDataSerializer;
@@ -57,8 +54,6 @@ public final class TimerRegistration {
throws Exception {
this.keyedStateBackend = keyedStateBackend;
this.internalTimerService = internalTimerService;
- this.internalEventTimeTimersQueue =
-
TimerUtils.getInternalEventTimeTimersQueue(internalTimerService);
this.keyContext = keyContext;
this.namespaceSerializer = namespaceSerializer;
this.timerDataSerializer = timerDataSerializer;
@@ -111,17 +106,6 @@ public final class TimerRegistration {
}
}
- /**
- * Returns if there's any event-time timer in the queue, that should be
triggered because
- * watermark advance.
- */
- public boolean hasEventTimeTimerBeforeTimestamp(long timestamp) throws
Exception {
- return TimerUtils.hasEventTimeTimerBeforeTimestamp(
- internalEventTimeTimersQueue,
- timestamp,
- PythonOperatorUtils.inBatchExecutionMode(keyedStateBackend));
- }
-
/** The flag for indicating the timer operation type. */
private enum TimerOperandType {
REGISTER_EVENT_TIMER((byte) 0),
diff --git
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
index 4c9abf97ca1..aefd1834a24 100644
---
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
+++
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/timer/TimerUtils.java
@@ -22,14 +22,8 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
-import org.apache.flink.runtime.state.InternalPriorityQueue;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.TimerHeapInternalTimer;
import org.apache.flink.streaming.api.utils.ProtoUtils;
import org.apache.flink.types.Row;
-import org.apache.flink.util.Preconditions;
-
-import java.lang.reflect.Field;
/** Utilities for timer. */
@Internal
@@ -48,28 +42,4 @@ public final class TimerUtils {
return ProtoUtils.createRawTypeCoderInfoDescriptorProto(
timerDataType, FlinkFnApi.CoderInfoDescriptor.Mode.SINGLE,
false);
}
-
- @SuppressWarnings("unchecked")
- public static InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>
- getInternalEventTimeTimersQueue(InternalTimerService<?>
internalTimerService)
- throws Exception {
- Field queueField =
internalTimerService.getClass().getDeclaredField("eventTimeTimersQueue");
- queueField.setAccessible(true);
- return (InternalPriorityQueue<TimerHeapInternalTimer<?, ?>>)
- queueField.get(internalTimerService);
- }
-
- public static boolean hasEventTimeTimerBeforeTimestamp(
- InternalPriorityQueue<TimerHeapInternalTimer<?, ?>> timerQueue,
- long timestamp,
- boolean isBatchMode)
- throws Exception {
- if (isBatchMode) {
- Preconditions.checkArgument(timestamp == Long.MAX_VALUE);
- return timerQueue.size() == 0;
- }
-
- TimerHeapInternalTimer<?, ?> minTimer = timerQueue.peek();
- return minTimer == null || minTimer.getTimestamp() > timestamp;
- }
}
diff --git
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
index feb85844217..16734ed6c37 100644
---
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
+++
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperatorTest.java
@@ -213,8 +213,6 @@ public class PythonStreamGroupWindowAggregateOperatorTest
testHarness.processElement(newRecord(true, initialTime + 3, "c1",
"c6", 2L, 10000L));
testHarness.processElement(newRecord(true, initialTime + 4, "c2",
"c8", 3L, 0L));
testHarness.processWatermark(new Watermark(20000L));
- assertOutputEquals(
- "FinishBundle should not be triggered.", expectedOutput,
testHarness.getOutput());
testHarness.setProcessingTime(1000L);
expectedOutput.add(newWindowRecord(-5000L, 5000L, "c1", 0L));
diff --git
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
index 6031c7f9441..2dc8a625311 100644
---
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
+++
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
@@ -91,8 +91,6 @@ public class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
testHarness.processWatermark(Long.MAX_VALUE);
testHarness.close();
- expectedOutput.add(new Watermark(Long.MAX_VALUE));
-
expectedOutput.add(
new StreamRecord<>(
newRow(
@@ -146,6 +144,8 @@ public class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
TimestampData.fromEpochMillis(10000L),
TimestampData.fromEpochMillis(20000L))));
+ expectedOutput.add(new Watermark(Long.MAX_VALUE));
+
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
}
@@ -172,8 +172,6 @@ public class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
// checkpoint trigger finishBundle
testHarness.prepareSnapshotPreBarrier(0L);
- expectedOutput.add(new Watermark(10000L));
-
expectedOutput.add(
new StreamRecord<>(
newRow(
@@ -210,14 +208,14 @@ public class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
TimestampData.fromEpochMillis(0L),
TimestampData.fromEpochMillis(10000L))));
+ expectedOutput.add(new Watermark(10000L));
+
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
testHarness.processWatermark(20000L);
testHarness.close();
- expectedOutput.add(new Watermark(20000L));
-
expectedOutput.add(
new StreamRecord<>(
newRow(
@@ -235,6 +233,8 @@ public class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
TimestampData.fromEpochMillis(10000L),
TimestampData.fromEpochMillis(20000L))));
+ expectedOutput.add(new Watermark(20000L));
+
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
}
@@ -301,8 +301,6 @@ public class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
testHarness.processWatermark(20000L);
testHarness.close();
- expectedOutput.add(new Watermark(20000L));
-
expectedOutput.add(
new StreamRecord<>(
newRow(
@@ -320,6 +318,8 @@ public class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
TimestampData.fromEpochMillis(10000L),
TimestampData.fromEpochMillis(20000L))));
+ expectedOutput.add(new Watermark(20000L));
+
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
}
@@ -344,7 +344,6 @@ public class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
testHarness.processElement(
new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 0L),
initialTime + 4));
testHarness.processWatermark(new Watermark(20000L));
- expectedOutput.add(new Watermark(20000L));
assertOutputEquals(
"FinishBundle should not be triggered.", expectedOutput,
testHarness.getOutput());
@@ -401,6 +400,8 @@ public class
StreamArrowPythonGroupWindowAggregateFunctionOperatorTest
TimestampData.fromEpochMillis(10000L),
TimestampData.fromEpochMillis(20000L))));
+ expectedOutput.add(new Watermark(20000L));
+
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
testHarness.close();
diff --git
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
index 7dce45700d6..0db2d8e58c7 100644
---
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
+++
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRangeOperatorTest.java
@@ -80,11 +80,11 @@ public class
StreamArrowPythonRowTimeBoundedRangeOperatorTest
testHarness.close();
- expectedOutput.add(new Watermark(Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L,
3L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L,
10L, 2L)));
+ expectedOutput.add(new Watermark(Long.MAX_VALUE));
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
}
@@ -110,7 +110,6 @@ public class
StreamArrowPythonRowTimeBoundedRangeOperatorTest
new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L),
initialTime + 3));
testHarness.processWatermark(new Watermark(10000L));
- expectedOutput.add(new Watermark(10000L));
assertOutputEquals(
"FinishBundle should not be triggered.", expectedOutput,
testHarness.getOutput());
// checkpoint trigger finishBundle
@@ -120,6 +119,7 @@ public class
StreamArrowPythonRowTimeBoundedRangeOperatorTest
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L,
3L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L,
10L, 2L)));
+ expectedOutput.add(new Watermark(10000L));
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
@@ -182,7 +182,6 @@ public class
StreamArrowPythonRowTimeBoundedRangeOperatorTest
testHarness.processElement(
new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L),
initialTime + 3));
testHarness.processWatermark(new Watermark(10000L));
- expectedOutput.add(new Watermark(10000L));
assertOutputEquals(
"FinishBundle should not be triggered.", expectedOutput,
testHarness.getOutput());
@@ -191,6 +190,7 @@ public class
StreamArrowPythonRowTimeBoundedRangeOperatorTest
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L,
3L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L,
10L, 2L)));
+ expectedOutput.add(new Watermark(10000L));
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
diff --git
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
index 26208053867..9385b1138e6 100644
---
a/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
+++
b/flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonRowTimeBoundedRowsOperatorTest.java
@@ -76,11 +76,11 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
testHarness.close();
- expectedOutput.add(new Watermark(Long.MAX_VALUE));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c2", 0L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L,
3L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L,
10L, 1L)));
+ expectedOutput.add(new Watermark(Long.MAX_VALUE));
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
}
@@ -106,7 +106,6 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L),
initialTime + 3));
testHarness.processWatermark(new Watermark(10000L));
- expectedOutput.add(new Watermark(10000L));
assertOutputEquals(
"FinishBundle should not be triggered.", expectedOutput,
testHarness.getOutput());
// checkpoint trigger finishBundle
@@ -116,6 +115,7 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L,
3L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L,
10L, 1L)));
+ expectedOutput.add(new Watermark(10000L));
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());
@@ -178,7 +178,6 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
testHarness.processElement(
new StreamRecord<>(newBinaryRow(true, "c2", "c8", 3L, 2L),
initialTime + 3));
testHarness.processWatermark(new Watermark(10000L));
- expectedOutput.add(new Watermark(10000L));
assertOutputEquals(
"FinishBundle should not be triggered.", expectedOutput,
testHarness.getOutput());
@@ -187,6 +186,7 @@ public class StreamArrowPythonRowTimeBoundedRowsOperatorTest
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c4", 1L, 1L,
0L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c2", "c8", 3L, 2L,
3L)));
expectedOutput.add(new StreamRecord<>(newRow(true, "c1", "c6", 2L,
10L, 1L)));
+ expectedOutput.add(new Watermark(10000L));
assertOutputEquals("Output was not correct.", expectedOutput,
testHarness.getOutput());