Repository: flink
Updated Branches:
  refs/heads/master 42328bd9b -> a0249d993


[FLINK-6311] [kinesis] NPE in FlinkKinesisConsumer if source was closed before 
run

This closes #3738.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0249d99
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0249d99
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0249d99

Branch: refs/heads/master
Commit: a0249d9935d54fbd6bb6c2cc130f51ce2ccafac3
Parents: 42328bd
Author: zhangminglei <[email protected]>
Authored: Wed Apr 19 17:43:57 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Fri Apr 21 07:15:10 2017 +0800

----------------------------------------------------------------------
 .../connectors/kinesis/internals/KinesisDataFetcher.java      | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a0249d99/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index 46847b3..8f7ca6c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -149,7 +149,7 @@ public class KinesisDataFetcher<T> {
        private final KinesisProxyInterface kinesis;
 
        /** Thread that executed runFetcher() */
-       private Thread mainThread;
+       private volatile Thread mainThread;
 
        /**
         * The current number of shards that are actively read by this fetcher.
@@ -408,7 +408,10 @@ public class KinesisDataFetcher<T> {
         */
        public void shutdownFetcher() {
                running = false;
-               mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval
+
+               if (mainThread != null) {
+                       mainThread.interrupt(); // the main thread may be 
sleeping for the discovery interval
+               }
 
                if (LOG.isInfoEnabled()) {
                        LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);

Reply via email to