This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8e0b66bf750bd911024079a097f894f538d7793
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Sat Dec 5 13:47:26 2020 +0800

    [FLINK-20492][runtime] SourceOperator.dispose() should close the source 
reader.
---
 .../api/connector/source/mocks/MockSourceReader.java    | 12 ++++++++----
 .../flink/streaming/api/operators/SourceOperator.java   | 17 ++++++++++++++---
 .../runtime/tasks/SourceOperatorStreamTask.java         |  5 +++++
 .../streaming/api/operators/SourceOperatorTest.java     | 12 ++++++++++++
 .../runtime/tasks/SourceOperatorStreamTaskTest.java     |  3 +++
 5 files changed, 42 insertions(+), 7 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index 8eb4596..7008ae4 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -41,7 +41,7 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
 
        private int currentSplitIndex = 0;
        private boolean started;
-       private boolean closed;
+       private int timesClosed;
        private boolean waitingForMoreSplits;
 
        @GuardedBy("this")
@@ -53,7 +53,7 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
 
        public MockSourceReader(boolean waitingForMoreSplits, boolean 
markIdleOnNoSplits) {
                this.started = false;
-               this.closed = false;
+               this.timesClosed = 0;
                this.availableFuture = CompletableFuture.completedFuture(null);
                this.waitingForMoreSplits = waitingForMoreSplits;
                this.markIdleOnNoSplits = markIdleOnNoSplits;
@@ -115,7 +115,7 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
 
        @Override
        public void close() throws Exception {
-               this.closed = true;
+               timesClosed++;
        }
 
        @Override
@@ -158,7 +158,11 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
        }
 
        public boolean isClosed() {
-               return closed;
+               return timesClosed > 0;
+       }
+
+       public int getTimesClosed() {
+               return timesClosed;
        }
 
        public List<MockSourceSplit> getAssignedSplits() {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index b4d6db4..ab79f75 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -218,16 +218,27 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
 
        @Override
        public void close() throws Exception {
-               if (sourceReader != null) {
-                       sourceReader.close();
-               }
                if (eventTimeLogic != null) {
                        eventTimeLogic.stopPeriodicWatermarkEmits();
                }
+               if (sourceReader != null) {
+                       sourceReader.close();
+                       // Set the field to null so the reader won't be closed 
again in dispose().
+                       sourceReader = null;
+               }
                super.close();
        }
 
        @Override
+       public void dispose() throws Exception {
+               // We also need to close the source reader to make sure the 
resources
+               // are released if the task does not finish normally.
+               if (sourceReader != null) {
+                       sourceReader.close();
+               }
+       }
+
+       @Override
        public InputStatus emitNext(DataOutput<OUT> output) throws Exception {
                // guarding an assumptions we currently make due to the fact 
that certain classes
                // assume a constant output
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index d55e542..beeeec7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -56,6 +56,11 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
        }
 
        @Override
+       protected CompletableFuture<Void> getCompletionFuture() {
+               return super.getCompletionFuture();
+       }
+
+       @Override
        public void init() throws Exception {
                final SourceOperator<T, ?> sourceOperator = headOperator;
                // reader initialization, which cannot happen in the 
constructor due to the
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
index c0ce943..5e16b60 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/SourceOperatorTest.java
@@ -91,6 +91,7 @@ public class SourceOperatorTest {
        @After
        public void cleanUp() throws Exception {
                operator.close();
+               operator.dispose();
                assertTrue(mockSourceReader.isClosed());
        }
 
@@ -177,6 +178,17 @@ public class SourceOperatorTest {
                assertEquals(100L, (long) 
mockSourceReader.getAbortedCheckpoints().get(0));
        }
 
+       @Test
+       public void testDisposeAfterCloseOnlyClosesReaderOnce() throws 
Exception {
+               // Initialize the operator.
+               operator.initializeState(getStateContext());
+               // Open the operator.
+               operator.open();
+               operator.close();
+               operator.dispose();
+               assertEquals(1, mockSourceReader.getTimesClosed());
+       }
+
        // ---------------- helper methods -------------------------
 
        private StateInitializationContext getStateContext() throws Exception {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
index ce60719..17923a0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTaskTest.java
@@ -200,6 +200,9 @@ public class SourceOperatorStreamTaskTest {
                // checkpoint is completed will block.
                getSourceReaderFromTask(testHarness).markAvailable();
                processUntil(testHarness, checkpointFuture::isDone);
+               Future<Void> checkpointNotified =
+                       
testHarness.getStreamTask().notifyCheckpointCompleteAsync(checkpointId);
+               processUntil(testHarness, checkpointNotified::isDone);
                waitForAcknowledgeLatch.await();
        }
 

Reply via email to