[FLINK-5407] Handle snapshoting null-operator in chain
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81eaafac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81eaafac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81eaafac Branch: refs/heads/release-1.2 Commit: 81eaafac70a9ec543ae2e81b6dd006d80c137fa5 Parents: 68a2520 Author: Stefan Richter <[email protected]> Authored: Thu Jan 5 14:28:50 2017 +0100 Committer: Aljoscha Krettek <[email protected]> Committed: Thu Jan 12 17:51:02 2017 +0100 ---------------------------------------------------------------------- .../streaming/runtime/tasks/StreamTask.java | 45 ++++++++++++++------ 1 file changed, 33 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/81eaafac/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 3bbc53b..530401b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -901,8 +901,15 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(snapshotInProgressList.size()); for (OperatorSnapshotResult snapshotInProgress : snapshotInProgressList) { - operatorStatesBackend.add(FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture())); - operatorStatesStream.add(FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture())); + if (null != snapshotInProgress) { + operatorStatesBackend.add( + FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateManagedFuture())); + operatorStatesStream.add( + FutureUtil.runIfNotDoneAndGet(snapshotInProgress.getOperatorStateRawFuture())); + } else { + operatorStatesBackend.add(null); + operatorStatesStream.add(null); + } } final long asyncEndNanos = System.nanoTime(); @@ -949,7 +956,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> public void close() { // cleanup/release ongoing snapshot operations for (OperatorSnapshotResult snapshotResult : snapshotInProgressList) { - snapshotResult.cancel(); + if (null != snapshotResult) { + snapshotResult.cancel(); + } } } } @@ -994,14 +1003,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> try { for (StreamOperator<?> op : allOperators) { - - createStreamFactory(op); - snapshotNonPartitionableState(op); - - OperatorSnapshotResult snapshotInProgress = - op.snapshotState(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), streamFactory); - - snapshotInProgressList.add(snapshotInProgress); + checkpointStreamOperator(op); } if (LOG.isDebugEnabled()) { @@ -1028,7 +1030,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> if (failed) { // Cleanup to release resources for (OperatorSnapshotResult operatorSnapshotResult : snapshotInProgressList) { - operatorSnapshotResult.cancel(); + if (null != operatorSnapshotResult) { + operatorSnapshotResult.cancel(); + } } if (LOG.isDebugEnabled()) { @@ -1038,7 +1042,24 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } } + } + + private void checkpointStreamOperator(StreamOperator<?> op) throws Exception { + if (null != op) { + createStreamFactory(op); + snapshotNonPartitionableState(op); + + OperatorSnapshotResult snapshotInProgress = op.snapshotState( + checkpointMetaData.getCheckpointId(), + checkpointMetaData.getTimestamp(), + streamFactory); + snapshotInProgressList.add(snapshotInProgress); + } else { + nonPartitionedStates.add(null); + OperatorSnapshotResult emptySnapshotInProgress = new OperatorSnapshotResult(); + snapshotInProgressList.add(emptySnapshotInProgress); + } } private void createStreamFactory(StreamOperator<?> operator) throws IOException {
