This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b63bae82e33 [FLINK-38506][tests] Fix flaky SideOutputITCase
b63bae82e33 is described below

commit b63bae82e33a0b6db2e270294d6ef1d2f73a66b5
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Feb 18 17:06:08 2026 +0100

    [FLINK-38506][tests] Fix flaky SideOutputITCase
---
 .../flink/test/streaming/runtime/SideOutputITCase.java | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index 6a8b420b7ff..9e2007ecf01 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -1147,6 +1147,7 @@ public class SideOutputITCase extends 
AbstractTestBaseJUnit4 implements Serializ
             @Override
             public SourceReader<Integer, TestSplit> 
createReader(SourceReaderContext ctx) {
                 return new TestSourceReader<Integer>(ctx) {
+                    private boolean noMoreSplits = false;
                     private boolean emitted = false;
 
                     @Override
@@ -1162,9 +1163,22 @@ public class SideOutputITCase extends 
AbstractTestBaseJUnit4 implements Serializ
                             out.collect(3, 3);
                             out.collect(4, 4);
                             emitted = true;
-                            return InputStatus.END_OF_INPUT;
                         }
-                        return InputStatus.END_OF_INPUT;
+
+                        // We're using SingleSplitEnumerator below which DOES 
send operator events
+                        // to the source tasks. Therefore, if we finish this 
task prematurely (by
+                        // returning END_OF_INPUT) such event delivery might 
fail, causing job and
+                        // test failure. Usually, this doesn't happen because 
the task
+                        // finishes before the RPC starts.
+                        // To avoid flakiness, we intentinoally wait for 
NO_MORE_SPLITS event on TM.
+                        return noMoreSplits
+                                ? InputStatus.END_OF_INPUT
+                                : InputStatus.NOTHING_AVAILABLE;
+                    }
+
+                    @Override
+                    public void notifyNoMoreSplits() {
+                        this.noMoreSplits = true;
                     }
                 };
             }

Reply via email to