Repository: flink Updated Branches: refs/heads/master 42328bd9b -> a0249d993
[FLINK-6311] [kinesis] NPE in FlinkKinesisConsumer if source was closed before run This closes #3738. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0249d99 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0249d99 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0249d99 Branch: refs/heads/master Commit: a0249d9935d54fbd6bb6c2cc130f51ce2ccafac3 Parents: 42328bd Author: zhangminglei <[email protected]> Authored: Wed Apr 19 17:43:57 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Fri Apr 21 07:15:10 2017 +0800 ---------------------------------------------------------------------- .../connectors/kinesis/internals/KinesisDataFetcher.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a0249d99/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index 46847b3..8f7ca6c 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -149,7 +149,7 @@ public class KinesisDataFetcher<T> { private final KinesisProxyInterface kinesis; /** Thread that executed runFetcher() */ - private Thread mainThread; + private volatile Thread mainThread; /** * The current number of shards that are actively read by this fetcher. @@ -408,7 +408,10 @@ public class KinesisDataFetcher<T> { */ public void shutdownFetcher() { running = false; - mainThread.interrupt(); // the main thread may be sleeping for the discovery interval + + if (mainThread != null) { + mainThread.interrupt(); // the main thread may be sleeping for the discovery interval + } if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
