This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bbc94d750dbdc64aee7cb78de010a5943e14e637 Author: anton <[email protected]> AuthorDate: Fri May 21 13:34:57 2021 +0200 fixup: Create the new operator states instead of changing old one --- .../execution_checkpointing_configuration.html | 12 ++--- .../runtime/checkpoint/CheckpointCoordinator.java | 51 +++++++++++++--------- .../api/environment/CheckpointConfig.java | 6 ++- .../environment/ExecutionCheckpointingOptions.java | 2 +- 4 files changed, 42 insertions(+), 29 deletions(-) diff --git a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html index 88dfc04..90b6e9bd 100644 --- a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html @@ -21,12 +21,6 @@ <td>Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status <code class="highlighter-rouge">JobStatus#FAILED</code> or <code class="highlighter-rouge">JobStatus#SUSPENDED</code>. In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.<br /><br />The mode defines how an externalized checkpoint should [...] </tr> <tr> - <td><h5>execution.checkpointing.id-of-ignored-in-flight-data</h5></td> - <td style="word-wrap: break-word;">-1</td> - <td>Long</td> - <td>Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.<br /><br />It is better to keep this value empty until there is explicit needs to restore from the specific checkpoint without in-flight data.<br /></td> - </tr> - <tr> <td><h5>execution.checkpointing.interval</h5></td> <td style="word-wrap: break-word;">(none)</td> <td>Duration</td> @@ -57,6 +51,12 @@ <td>If enabled, a job recovery should fallback to checkpoint when there is a more recent savepoint.</td> </tr> <tr> + <td><h5>execution.checkpointing.recover-without-channel-state.checkpoint-id</h5></td> + <td style="word-wrap: break-word;">-1</td> + <td>Long</td> + <td>Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.<br /><br />It is better to keep this value empty until there is explicit needs to restore from the specific checkpoint without in-flight data.<br /></td> + </tr> + <tr> <td><h5>execution.checkpointing.timeout</h5></td> <td style="word-wrap: break-word;">10 min</td> <td>Duration</td> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index c5c1e7e..90d94e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1600,30 +1600,39 @@ public class CheckpointCoordinator { } private Map<OperatorID, OperatorState> extractOperatorStates(CompletedCheckpoint checkpoint) { - Map<OperatorID, OperatorState> operatorStates = checkpoint.getOperatorStates(); - - if (checkpoint.getCheckpointID() == checkpointIdOfIgnoredInFlightData) { - // rewrite the operator state with empty in-flight data. - for (OperatorState operatorState : operatorStates.values()) { - for (Map.Entry<Integer, OperatorSubtaskState> subtaskStateEntry : - operatorState.getSubtaskStates().entrySet()) { - - OperatorSubtaskState subtaskState = subtaskStateEntry.getValue(); - if (!subtaskState.getResultSubpartitionState().isEmpty() - || !subtaskState.getInputChannelState().isEmpty()) { - operatorState.putState( - subtaskStateEntry.getKey(), - subtaskState - .toBuilder() - .setResultSubpartitionState(StateObjectCollection.empty()) - .setInputChannelState(StateObjectCollection.empty()) - .build()); - } - } + Map<OperatorID, OperatorState> originalOperatorStates = checkpoint.getOperatorStates(); + + if (checkpoint.getCheckpointID() != checkpointIdOfIgnoredInFlightData) { + // Don't do any changes if it is not required. + return originalOperatorStates; + } + + HashMap<OperatorID, OperatorState> newStates = new HashMap<>(); + // Create the new operator states without in-flight data. + for (OperatorState originalOperatorState : originalOperatorStates.values()) { + OperatorState newState = + new OperatorState( + originalOperatorState.getOperatorID(), + originalOperatorState.getParallelism(), + originalOperatorState.getMaxParallelism()); + + newStates.put(newState.getOperatorID(), newState); + + for (Map.Entry<Integer, OperatorSubtaskState> originalSubtaskStateEntry : + originalOperatorState.getSubtaskStates().entrySet()) { + + newState.putState( + originalSubtaskStateEntry.getKey(), + originalSubtaskStateEntry + .getValue() + .toBuilder() + .setResultSubpartitionState(StateObjectCollection.empty()) + .setInputChannelState(StateObjectCollection.empty()) + .build()); } } - return operatorStates; + return newStates; } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index 02ee15b..a087a39 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -65,6 +65,9 @@ public class CheckpointConfig implements java.io.Serializable { public static final int UNDEFINED_TOLERABLE_CHECKPOINT_NUMBER = -1; + /** Default id of checkpoint for which in-flight data should be ignored on recovery. */ + public static final int DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = -1; + // ------------------------------------------------------------------------ /** Checkpointing mode (exactly-once vs. at-least-once). */ @@ -92,7 +95,8 @@ public class CheckpointConfig implements java.io.Serializable { private boolean unalignedCheckpointsEnabled; /** Id of checkpoint for which in-flight data should be ignored on recovery. */ - private long checkpointIdOfIgnoredInFlightData; + private long checkpointIdOfIgnoredInFlightData = + DEFAULT_CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA; private Duration alignmentTimeout = ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT.defaultValue(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java index 40c5a6e..1c9d97a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java @@ -198,7 +198,7 @@ public class ExecutionCheckpointingOptions { .build()); public static final ConfigOption<Long> CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = - ConfigOptions.key("execution.checkpointing.id-of-ignored-in-flight-data") + ConfigOptions.key("execution.checkpointing.recover-without-channel-state.checkpoint-id") .longType() .defaultValue(-1L) .withDescription(
