This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 259a6a705aa29458ac83d4b5c527bc5e0ced4ae3
Author: Roman Khachatryan <[email protected]>
AuthorDate: Sun Jan 18 22:22:18 2026 +0100

    [hotfix][tests] Close OutputWriter in SourceOperatorStreamTaskTest
---
 .../flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java  | 5 +++--
 .../flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java  | 4 ++++
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index 5648fd96466..e4b4881ba31 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -277,8 +277,9 @@ class SourceOperatorStreamTaskTest extends 
SourceStreamTaskTestBase {
                         .finish()
                         .build()) {
 
-            testHarness.getStreamTask().invoke();
-            testHarness.processAll();
+            testHarness.getStreamTask().invoke(); // should result in 
end-of-input and task closing
+            testHarness.processAll(); // no-op
+            testHarness.cleanUp(); // close output writer
             assertThat(output)
                     .containsExactly(Watermark.MAX_WATERMARK, new 
EndOfData(StopMode.DRAIN));
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
index ddd06e313e3..0929f3940e7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java
@@ -164,6 +164,10 @@ public class StreamTaskMailboxTestHarness<OUT> implements 
AutoCloseable {
 
     public void finishProcessing() throws Exception {
         streamTask.afterInvoke();
+        cleanUp();
+    }
+
+    public  void cleanUp() throws Exception {
         streamTask.cleanUp(null);
     }
 

Reply via email to