This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 259a6a705aa29458ac83d4b5c527bc5e0ced4ae3 Author: Roman Khachatryan <[email protected]> AuthorDate: Sun Jan 18 22:22:18 2026 +0100 [hotfix][tests] Close OutputWriter in SourceOperatorStreamTaskTest --- .../flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java | 5 +++-- .../flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java | 4 ++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java index 5648fd96466..e4b4881ba31 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java @@ -277,8 +277,9 @@ class SourceOperatorStreamTaskTest extends SourceStreamTaskTestBase { .finish() .build()) { - testHarness.getStreamTask().invoke(); - testHarness.processAll(); + testHarness.getStreamTask().invoke(); // should result in end-of-input and task closing + testHarness.processAll(); // no-op + testHarness.cleanUp(); // close output writer assertThat(output) .containsExactly(Watermark.MAX_WATERMARK, new EndOfData(StopMode.DRAIN)); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java index ddd06e313e3..0929f3940e7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarness.java @@ -164,6 +164,10 @@ public class StreamTaskMailboxTestHarness<OUT> implements AutoCloseable { public void finishProcessing() throws Exception { streamTask.afterInvoke(); + cleanUp(); + } + + public void cleanUp() throws Exception { streamTask.cleanUp(null); }
