Repository: flink
Updated Branches:
  refs/heads/release-1.2 4a1aa4ce8 -> 80dc704fc


[FLINK-6311] [kinesis] NPE in FlinkKinesisConsumer if source was closed before 
run

This closes #3738.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80dc704f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80dc704f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80dc704f

Branch: refs/heads/release-1.2
Commit: 80dc704fc9552eb68b5b6b1e85dc971da35fce45
Parents: 4a1aa4c
Author: zhangminglei <[email protected]>
Authored: Wed Apr 19 17:43:57 2017 +0800
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Fri Apr 21 07:21:39 2017 +0800

----------------------------------------------------------------------
 .../connectors/kinesis/internals/KinesisDataFetcher.java      | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/80dc704f/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index a06fdca..b642f49 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -150,7 +150,7 @@ public class KinesisDataFetcher<T> {
        private final KinesisProxyInterface kinesis;
 
        /** Thread that executed runFetcher() */
-       private Thread mainThread;
+       private volatile Thread mainThread;
 
        /**
         * The current number of shards that are actively read by this fetcher.
@@ -469,7 +469,10 @@ public class KinesisDataFetcher<T> {
         */
        public void shutdownFetcher() {
                running = false;
-               mainThread.interrupt(); // the main thread may be sleeping for 
the discovery interval
+
+               if (mainThread != null) {
+                       mainThread.interrupt(); // the main thread may be 
sleeping for the discovery interval
+               }
 
                if (LOG.isInfoEnabled()) {
                        LOG.info("Shutting down the shard consumer threads of 
subtask {} ...", indexOfThisConsumerSubtask);

Reply via email to