[FLINK-5407] Handle snapshoting null-operator in chain

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81eaafac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81eaafac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81eaafac

Branch: refs/heads/release-1.2
Commit: 81eaafac70a9ec543ae2e81b6dd006d80c137fa5
Parents: 68a2520
Author: Stefan Richter <[email protected]>
Authored: Thu Jan 5 14:28:50 2017 +0100
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Jan 12 17:51:02 2017 +0100

----------------------------------------------------------------------
 .../streaming/runtime/tasks/StreamTask.java     | 45 ++++++++++++++------
 1 file changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/81eaafac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 3bbc53b..530401b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -901,8 +901,15 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                List<OperatorStateHandle> operatorStatesStream 
= new ArrayList<>(snapshotInProgressList.size());
 
                                for (OperatorSnapshotResult snapshotInProgress 
: snapshotInProgressList) {
-                                       
operatorStatesBackend.add(FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
-                                       
operatorStatesStream.add(FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
+                                       if (null != snapshotInProgress) {
+                                               operatorStatesBackend.add(
+                                                               
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture()));
+                                               operatorStatesStream.add(
+                                                               
FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture()));
+                                       } else {
+                                               operatorStatesBackend.add(null);
+                                               operatorStatesStream.add(null);
+                                       }
                                }
 
                                final long asyncEndNanos = System.nanoTime();
@@ -949,7 +956,9 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                public void close() {
                        // cleanup/release ongoing snapshot operations
                        for (OperatorSnapshotResult snapshotResult : 
snapshotInProgressList) {
-                               snapshotResult.cancel();
+                               if (null != snapshotResult) {
+                                       snapshotResult.cancel();
+                               }
                        }
                }
        }
@@ -994,14 +1003,7 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        try {
 
                                for (StreamOperator<?> op : allOperators) {
-
-                                       createStreamFactory(op);
-                                       snapshotNonPartitionableState(op);
-
-                                       OperatorSnapshotResult 
snapshotInProgress =
-                                                       
op.snapshotState(checkpointMetaData.getCheckpointId(), 
checkpointMetaData.getTimestamp(), streamFactory);
-
-                                       
snapshotInProgressList.add(snapshotInProgress);
+                                       checkpointStreamOperator(op);
                                }
 
                                if (LOG.isDebugEnabled()) {
@@ -1028,7 +1030,9 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                if (failed) {
                                        // Cleanup to release resources
                                        for (OperatorSnapshotResult 
operatorSnapshotResult : snapshotInProgressList) {
-                                               operatorSnapshotResult.cancel();
+                                               if (null != 
operatorSnapshotResult) {
+                                                       
operatorSnapshotResult.cancel();
+                                               }
                                        }
 
                                        if (LOG.isDebugEnabled()) {
@@ -1038,7 +1042,24 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                        }
                                }
                        }
+               }
+
+               private void checkpointStreamOperator(StreamOperator<?> op) 
throws Exception {
+                       if (null != op) {
+                               createStreamFactory(op);
+                               snapshotNonPartitionableState(op);
+
+                               OperatorSnapshotResult snapshotInProgress = 
op.snapshotState(
+                                               
checkpointMetaData.getCheckpointId(),
+                                               
checkpointMetaData.getTimestamp(),
+                                               streamFactory);
 
+                               snapshotInProgressList.add(snapshotInProgress);
+                       } else {
+                               nonPartitionedStates.add(null);
+                               OperatorSnapshotResult emptySnapshotInProgress 
= new OperatorSnapshotResult();
+                               
snapshotInProgressList.add(emptySnapshotInProgress);
+                       }
                }
 
                private void createStreamFactory(StreamOperator<?> operator) 
throws IOException {

Reply via email to