This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b63bae82e33 [FLINK-38506][tests] Fix flaky SideOutputITCase
b63bae82e33 is described below
commit b63bae82e33a0b6db2e270294d6ef1d2f73a66b5
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Feb 18 17:06:08 2026 +0100
[FLINK-38506][tests] Fix flaky SideOutputITCase
---
.../flink/test/streaming/runtime/SideOutputITCase.java | 18 ++++++++++++++++--
1 file changed, 16 insertions(+), 2 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index 6a8b420b7ff..9e2007ecf01 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -1147,6 +1147,7 @@ public class SideOutputITCase extends
AbstractTestBaseJUnit4 implements Serializ
@Override
public SourceReader<Integer, TestSplit>
createReader(SourceReaderContext ctx) {
return new TestSourceReader<Integer>(ctx) {
+ private boolean noMoreSplits = false;
private boolean emitted = false;
@Override
@@ -1162,9 +1163,22 @@ public class SideOutputITCase extends
AbstractTestBaseJUnit4 implements Serializ
out.collect(3, 3);
out.collect(4, 4);
emitted = true;
- return InputStatus.END_OF_INPUT;
}
- return InputStatus.END_OF_INPUT;
+
+ // We're using SingleSplitEnumerator below which DOES
send operator events
+ // to the source tasks. Therefore, if we finish this
task prematurely (by
+ // returning END_OF_INPUT) such event delivery might
fail, causing job and
+ // test failure. Usually, this doesn't happen because
the task
+ // finishes before the RPC starts.
+ // To avoid flakiness, we intentinoally wait for
NO_MORE_SPLITS event on TM.
+ return noMoreSplits
+ ? InputStatus.END_OF_INPUT
+ : InputStatus.NOTHING_AVAILABLE;
+ }
+
+ @Override
+ public void notifyNoMoreSplits() {
+ this.noMoreSplits = true;
}
};
}