Repository: flink Updated Branches: refs/heads/release-1.2 4a1aa4ce8 -> 80dc704fc
[FLINK-6311] [kinesis] NPE in FlinkKinesisConsumer if source was closed before run This closes #3738. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80dc704f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80dc704f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80dc704f Branch: refs/heads/release-1.2 Commit: 80dc704fc9552eb68b5b6b1e85dc971da35fce45 Parents: 4a1aa4c Author: zhangminglei <[email protected]> Authored: Wed Apr 19 17:43:57 2017 +0800 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Fri Apr 21 07:21:39 2017 +0800 ---------------------------------------------------------------------- .../connectors/kinesis/internals/KinesisDataFetcher.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/80dc704f/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index a06fdca..b642f49 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -150,7 +150,7 @@ public class KinesisDataFetcher<T> { private final KinesisProxyInterface kinesis; /** Thread that executed runFetcher() */ - private Thread mainThread; + private volatile Thread mainThread; /** * The current number of shards that are actively read by this fetcher. @@ -469,7 +469,10 @@ public class KinesisDataFetcher<T> { */ public void shutdownFetcher() { running = false; - mainThread.interrupt(); // the main thread may be sleeping for the discovery interval + + if (mainThread != null) { + mainThread.interrupt(); // the main thread may be sleeping for the discovery interval + } if (LOG.isInfoEnabled()) { LOG.info("Shutting down the shard consumer threads of subtask {} ...", indexOfThisConsumerSubtask);
