This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 7c7f083 [FLINK-20781] Avoid NPE after SourceOperator is closed.
7c7f083 is described below
commit 7c7f0839f5d13147c24a9e33fb93a7a6758e844b
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Mon Jan 4 14:53:01 2021 +0800
[FLINK-20781] Avoid NPE after SourceOperator is closed.
---
.../org/apache/flink/streaming/api/operators/SourceOperator.java | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index df67464..e0ea388 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -126,6 +126,9 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
*/
private TimestampsAndWatermarks<OUT> eventTimeLogic;
+ /** Indicating whether the source operator has been closed. */
+ private boolean closed;
+
public SourceOperator(
FunctionWithException<SourceReaderContext, SourceReader<OUT,
SplitT>, Exception>
readerFactory,
@@ -247,9 +250,8 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
}
if (sourceReader != null) {
sourceReader.close();
- // Set the field to null so the reader won't be closed again in
dispose().
- sourceReader = null;
}
+ closed = true;
super.close();
}
@@ -257,7 +259,7 @@ public class SourceOperator<OUT, SplitT extends
SourceSplit> extends AbstractStr
public void dispose() throws Exception {
// We also need to close the source reader to make sure the resources
// are released if the task does not finish normally.
- if (sourceReader != null) {
+ if (!closed && sourceReader != null) {
sourceReader.close();
}
}