This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a8e0b66bf750bd911024079a097f894f538d7793 Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Sat Dec 5 13:47:26 2020 +0800 [FLINK-20492][runtime] SourceOperator.dispose() should close the source reader. --- .../api/connector/source/mocks/MockSourceReader.java | 12 ++++++++---- .../flink/streaming/api/operators/SourceOperator.java | 17 ++++++++++++++--- .../runtime/tasks/SourceOperatorStreamTask.java | 5 +++++ .../streaming/api/operators/SourceOperatorTest.java | 12 ++++++++++++ .../runtime/tasks/SourceOperatorStreamTaskTest.java | 3 +++ 5 files changed, 42 insertions(+), 7 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java index 8eb4596..7008ae4 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java @@ -41,7 +41,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> private int currentSplitIndex = 0; private boolean started; - private boolean closed; + private int timesClosed; private boolean waitingForMoreSplits; @GuardedBy("this") @@ -53,7 +53,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> public MockSourceReader(boolean waitingForMoreSplits, boolean markIdleOnNoSplits) { this.started = false; - this.closed = false; + this.timesClosed = 0; this.availableFuture = CompletableFuture.completedFuture(null); this.waitingForMoreSplits = waitingForMoreSplits; this.markIdleOnNoSplits = markIdleOnNoSplits; @@ -115,7 +115,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> @Override public void close() throws Exception { - this.closed = true; + timesClosed++; } @Override @@ -158,7 +158,11 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> } public boolean isClosed() { - return closed; + return timesClosed > 0; + } + + public int getTimesClosed() { + return timesClosed; } public List<MockSourceSplit> getAssignedSplits() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index b4d6db4..ab79f75 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -218,16 +218,27 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> @Override public void close() throws Exception { - if (sourceReader != null) { - sourceReader.close(); - } if (eventTimeLogic != null) { eventTimeLogic.stopPeriodicWatermarkEmits(); } + if (sourceReader != null) { + sourceReader.close(); + // Set the field to null so the reader won't be closed again in dispose(). + sourceReader = null; + } super.close(); } @Override + public void dispose() throws Exception { + // We also need to close the source reader to make sure the resources + // are released if the task does not finish normally. + if (sourceReader != null) { + sourceReader.close(); + } + } + + @Override public InputStatus emitNext(DataOutput<OUT> output) throws Exception { // guarding an assumptions we currently make due to the fact that certain classes // assume a constant output diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java index d55e542..beeeec7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java @@ -56,6 +56,11 @@ public class SourceOperatorStreamTask<T> extends StreamTask<T, SourceOperator<T, } @Override + protected CompletableFuture<Void> getCompletionFuture() { + return super.getCompletionFuture(); + } + + @Override public void init() throws Exception { final SourceOperator<T, ?> sourceOperator = headOperator; // reader initialization, which cannot happen in the constructor due to the diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java index c0ce943..5e16b60 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java @@ -91,6 +91,7 @@ public class SourceOperatorTest { @After public void cleanUp() throws Exception { operator.close(); + operator.dispose(); assertTrue(mockSourceReader.isClosed()); } @@ -177,6 +178,17 @@ public class SourceOperatorTest { assertEquals(100L, (long) mockSourceReader.getAbortedCheckpoints().get(0)); } + @Test + public void testDisposeAfterCloseOnlyClosesReaderOnce() throws Exception { + // Initialize the operator. + operator.initializeState(getStateContext()); + // Open the operator. + operator.open(); + operator.close(); + operator.dispose(); + assertEquals(1, mockSourceReader.getTimesClosed()); + } + // ---------------- helper methods ------------------------- private StateInitializationContext getStateContext() throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java index ce60719..17923a0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java @@ -200,6 +200,9 @@ public class SourceOperatorStreamTaskTest { // checkpoint is completed will block. getSourceReaderFromTask(testHarness).markAvailable(); processUntil(testHarness, checkpointFuture::isDone); + Future<Void> checkpointNotified = + testHarness.getStreamTask().notifyCheckpointCompleteAsync(checkpointId); + processUntil(testHarness, checkpointNotified::isDone); waitForAcknowledgeLatch.await(); }
