This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 7c7f083  [FLINK-20781] Avoid NPE after SourceOperator is closed.
7c7f083 is described below

commit 7c7f0839f5d13147c24a9e33fb93a7a6758e844b
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Mon Jan 4 14:53:01 2021 +0800

    [FLINK-20781] Avoid NPE after SourceOperator is closed.
---
 .../org/apache/flink/streaming/api/operators/SourceOperator.java  | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index df67464..e0ea388 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -126,6 +126,9 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
      */
     private TimestampsAndWatermarks<OUT> eventTimeLogic;
 
+    /** Indicating whether the source operator has been closed. */
+    private boolean closed;
+
     public SourceOperator(
             FunctionWithException<SourceReaderContext, SourceReader<OUT, 
SplitT>, Exception>
                     readerFactory,
@@ -247,9 +250,8 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         }
         if (sourceReader != null) {
             sourceReader.close();
-            // Set the field to null so the reader won't be closed again in 
dispose().
-            sourceReader = null;
         }
+        closed = true;
         super.close();
     }
 
@@ -257,7 +259,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     public void dispose() throws Exception {
         // We also need to close the source reader to make sure the resources
         // are released if the task does not finish normally.
-        if (sourceReader != null) {
+        if (!closed && sourceReader != null) {
             sourceReader.close();
         }
     }

Reply via email to