This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 11f59bf60a0bab9f921cce9b43f75e57fb099a47 Author: Anton Kalashnikov <[email protected]> AuthorDate: Wed May 19 17:32:09 2021 +0200 [FLINK-22684][runtime] Added ability to ignore in-flight data during the recovery --- .../execution_checkpointing_configuration.html | 6 + .../jobmanager/JMXJobManagerMetricTest.java | 1 + .../runtime/checkpoint/CheckpointCoordinator.java | 33 +++- .../runtime/checkpoint/OperatorSubtaskState.java | 12 ++ .../tasks/CheckpointCoordinatorConfiguration.java | 34 +++- .../CheckpointCoordinatorMasterHooksTest.java | 1 + .../CheckpointCoordinatorRestoringTest.java | 140 +++++++++++++-- .../CheckpointSettingsSerializableTest.java | 1 + .../checkpoint/CheckpointStatsTrackerTest.java | 1 + .../ExecutionGraphCheckpointCoordinatorTest.java | 1 + .../FailoverStrategyCheckpointCoordinatorTest.java | 1 + .../checkpoint/OperatorSubtaskStateTest.java | 54 ++++++ .../executiongraph/ArchivedExecutionGraphTest.java | 1 + .../DefaultExecutionGraphDeploymentTest.java | 1 + .../flink/runtime/jobgraph/JobGraphTest.java | 1 + .../tasks/JobCheckpointingSettingsTest.java | 1 + .../runtime/scheduler/SchedulerTestingUtils.java | 1 + .../api/environment/CheckpointConfig.java | 30 ++++ .../environment/ExecutionCheckpointingOptions.java | 17 ++ .../api/graph/StreamingJobGraphGenerator.java | 2 + .../JobMasterStopWithSavepointITCase.java | 1 + .../jobmaster/JobMasterTriggerSavepointITCase.java | 1 + .../checkpointing/IgnoreInFlightDataITCase.java | 189 +++++++++++++++++++++ 23 files changed, 508 insertions(+), 22 deletions(-) diff --git a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html index c9786ab..88dfc04 100644 --- a/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/execution_checkpointing_configuration.html @@ -21,6 +21,12 @@ <td>Externalized checkpoints write their meta data out to persistent storage and are not automatically cleaned up when the owning job fails or is suspended (terminating with job status <code class="highlighter-rouge">JobStatus#FAILED</code> or <code class="highlighter-rouge">JobStatus#SUSPENDED</code>. In this case, you have to manually clean up the checkpoint state, both the meta data and actual program state.<br /><br />The mode defines how an externalized checkpoint should [...] </tr> <tr> + <td><h5>execution.checkpointing.id-of-ignored-in-flight-data</h5></td> + <td style="word-wrap: break-word;">-1</td> + <td>Long</td> + <td>Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.<br /><br />It is better to keep this value empty until there is explicit needs to restore from the specific checkpoint without in-flight data.<br /></td> + </tr> + <tr> <td><h5>execution.checkpointing.interval</h5></td> <td style="word-wrap: break-word;">(none)</td> <td>Duration</td> diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index 4ae173a..4044bf5 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -101,6 +101,7 @@ public class JMXJobManagerMetricTest extends TestLogger { true, false, false, + 0, 0), null); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index b31dd8f..c5c1e7e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -206,6 +206,9 @@ public class CheckpointCoordinator { private boolean isPreferCheckpointForRecovery; + /** Id of checkpoint for which in-flight data should be ignored on recovery. */ + private final long checkpointIdOfIgnoredInFlightData; + private final CheckpointFailureManager failureManager; private final Clock clock; @@ -309,6 +312,7 @@ public class CheckpointCoordinator { this.isExactlyOnceMode = chkConfig.isExactlyOnce(); this.unalignedCheckpointsEnabled = chkConfig.isUnalignedCheckpointsEnabled(); this.alignmentTimeout = chkConfig.getAlignmentTimeout(); + this.checkpointIdOfIgnoredInFlightData = chkConfig.getCheckpointIdOfIgnoredInFlightData(); this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); this.masterHooks = new HashMap<>(); @@ -1553,7 +1557,7 @@ public class CheckpointCoordinator { LOG.info("Restoring job {} from {}.", job, latest); // re-assign the task states - final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates(); + final Map<OperatorID, OperatorState> operatorStates = extractOperatorStates(latest); StateAssignmentOperation stateAssignmentOperation = new StateAssignmentOperation( @@ -1595,6 +1599,33 @@ public class CheckpointCoordinator { } } + private Map<OperatorID, OperatorState> extractOperatorStates(CompletedCheckpoint checkpoint) { + Map<OperatorID, OperatorState> operatorStates = checkpoint.getOperatorStates(); + + if (checkpoint.getCheckpointID() == checkpointIdOfIgnoredInFlightData) { + // rewrite the operator state with empty in-flight data. + for (OperatorState operatorState : operatorStates.values()) { + for (Map.Entry<Integer, OperatorSubtaskState> subtaskStateEntry : + operatorState.getSubtaskStates().entrySet()) { + + OperatorSubtaskState subtaskState = subtaskStateEntry.getValue(); + if (!subtaskState.getResultSubpartitionState().isEmpty() + || !subtaskState.getInputChannelState().isEmpty()) { + operatorState.putState( + subtaskStateEntry.getKey(), + subtaskState + .toBuilder() + .setResultSubpartitionState(StateObjectCollection.empty()) + .setInputChannelState(StateObjectCollection.empty()) + .build()); + } + } + } + } + + return operatorStates; + } + /** * Restore the state with given savepoint. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java index b6a9f64..9d0739a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java @@ -307,6 +307,18 @@ public class OperatorSubtaskState implements CompositeStateHandle { || resultSubpartitionState.hasState(); } + public Builder toBuilder() { + return builder() + .setManagedKeyedState(managedKeyedState) + .setManagedOperatorState(managedOperatorState) + .setRawOperatorState(rawOperatorState) + .setRawKeyedState(rawKeyedState) + .setInputChannelState(inputChannelState) + .setResultSubpartitionState(resultSubpartitionState) + .setInputRescalingDescriptor(inputRescalingDescriptor) + .setOutputRescalingDescriptor(outputRescalingDescriptor); + } + public static Builder builder() { return new Builder(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java index f679072..0fd6f87 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java @@ -64,6 +64,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { private final long alignmentTimeout; + private final long checkpointIdOfIgnoredInFlightData; + /** @deprecated use {@link #builder()}. */ @Deprecated @VisibleForTesting @@ -76,7 +78,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { boolean isExactlyOnce, boolean isUnalignedCheckpoint, boolean isPreferCheckpointForRecovery, - int tolerableCpFailureNumber) { + int tolerableCpFailureNumber, + long checkpointIdOfIgnoredInFlightData) { this( checkpointInterval, checkpointTimeout, @@ -87,7 +90,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { isPreferCheckpointForRecovery, tolerableCpFailureNumber, isUnalignedCheckpoint, - 0); + 0, + checkpointIdOfIgnoredInFlightData); } private CheckpointCoordinatorConfiguration( @@ -100,7 +104,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { boolean isPreferCheckpointForRecovery, int tolerableCpFailureNumber, boolean isUnalignedCheckpointsEnabled, - long alignmentTimeout) { + long alignmentTimeout, + long checkpointIdOfIgnoredInFlightData) { // sanity checks if (checkpointInterval < MINIMAL_CHECKPOINT_TIME @@ -124,6 +129,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { this.tolerableCheckpointFailureNumber = tolerableCpFailureNumber; this.isUnalignedCheckpointsEnabled = isUnalignedCheckpointsEnabled; this.alignmentTimeout = alignmentTimeout; + this.checkpointIdOfIgnoredInFlightData = checkpointIdOfIgnoredInFlightData; } public long getCheckpointInterval() { @@ -166,6 +172,10 @@ public class CheckpointCoordinatorConfiguration implements Serializable { return alignmentTimeout; } + public long getCheckpointIdOfIgnoredInFlightData() { + return checkpointIdOfIgnoredInFlightData; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -183,7 +193,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { && isUnalignedCheckpointsEnabled == that.isUnalignedCheckpointsEnabled && checkpointRetentionPolicy == that.checkpointRetentionPolicy && isPreferCheckpointForRecovery == that.isPreferCheckpointForRecovery - && tolerableCheckpointFailureNumber == that.tolerableCheckpointFailureNumber; + && tolerableCheckpointFailureNumber == that.tolerableCheckpointFailureNumber + && checkpointIdOfIgnoredInFlightData == that.checkpointIdOfIgnoredInFlightData; } @Override @@ -197,7 +208,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { isExactlyOnce, isUnalignedCheckpointsEnabled, isPreferCheckpointForRecovery, - tolerableCheckpointFailureNumber); + tolerableCheckpointFailureNumber, + checkpointIdOfIgnoredInFlightData); } @Override @@ -221,6 +233,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { + isPreferCheckpointForRecovery + ", tolerableCheckpointFailureNumber=" + tolerableCheckpointFailureNumber + + ", checkpointIdOfIgnoredInFlightData=" + + checkpointIdOfIgnoredInFlightData + '}'; } @@ -241,6 +255,7 @@ public class CheckpointCoordinatorConfiguration implements Serializable { private int tolerableCheckpointFailureNumber; private boolean isUnalignedCheckpointsEnabled; private long alignmentTimeout = 0; + private long checkpointIdOfIgnoredInFlightData; public CheckpointCoordinatorConfiguration build() { return new CheckpointCoordinatorConfiguration( @@ -253,7 +268,8 @@ public class CheckpointCoordinatorConfiguration implements Serializable { isPreferCheckpointForRecovery, tolerableCheckpointFailureNumber, isUnalignedCheckpointsEnabled, - alignmentTimeout); + alignmentTimeout, + checkpointIdOfIgnoredInFlightData); } public CheckpointCoordinatorConfigurationBuilder setCheckpointInterval( @@ -314,5 +330,11 @@ public class CheckpointCoordinatorConfiguration implements Serializable { this.alignmentTimeout = alignmentTimeout; return this; } + + public CheckpointCoordinatorConfigurationBuilder setCheckpointIdOfIgnoredInFlightData( + long checkpointIdOfIgnoredInFlightData) { + this.checkpointIdOfIgnoredInFlightData = checkpointIdOfIgnoredInFlightData; + return this; + } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java index 0dd7104..86337f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java @@ -474,6 +474,7 @@ public class CheckpointCoordinatorMasterHooksTest { true, false, false, + 0, 0); Executor executor = Executors.directExecutor(); return new CheckpointCoordinator( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java index 4bc02b6..6559166 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java @@ -59,11 +59,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Collections.singletonList; import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.compareKeyedState; import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.comparePartitionableState; import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateChainedPartitionableStateHandle; @@ -71,6 +73,8 @@ import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUt import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle; import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockSubtaskState; import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.verifyStateRestore; +import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle; +import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -283,8 +287,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { long checkpointId = checkpointIDCounter.getLast(); KeyGroupRange keyGroupRange = KeyGroupRange.of(0, 0); - List<SerializableObject> testStates = - Collections.singletonList(new SerializableObject()); + List<SerializableObject> testStates = singletonList(new SerializableObject()); KeyedStateHandle serializedKeyGroupStates = generateKeyGroupState(keyGroupRange, testStates); @@ -319,7 +322,7 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { KeyGroupRange keyGroupRangeForSavepoint = KeyGroupRange.of(1, 1); List<SerializableObject> testStatesForSavepoint = - Collections.singletonList(new SerializableObject()); + singletonList(new SerializableObject()); KeyedStateHandle serializedKeyGroupStatesForSavepoint = generateKeyGroupState(keyGroupRangeForSavepoint, testStatesForSavepoint); @@ -441,6 +444,9 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { .setManagedOperatorState(opStateBackend) .setManagedKeyedState(keyedStateBackend) .setRawKeyedState(keyedStateRaw) + .setInputChannelState( + StateObjectCollection.singleton( + createNewInputChannelStateHandle(3, new Random()))) .build(); TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot(); taskOperatorSubtaskStates.putSubtaskStateByOperatorID( @@ -474,10 +480,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { generatePartitionableStateHandle(jobVertexID2, index, 2, 8, false); OperatorStateHandle opStateRaw = generatePartitionableStateHandle(jobVertexID2, index, 2, 8, true); - expectedOpStatesBackend.add( - new ChainedStateHandle<>(Collections.singletonList(opStateBackend))); - expectedOpStatesRaw.add( - new ChainedStateHandle<>(Collections.singletonList(opStateRaw))); + expectedOpStatesBackend.add(new ChainedStateHandle<>(singletonList(opStateBackend))); + expectedOpStatesRaw.add(new ChainedStateHandle<>(singletonList(opStateRaw))); OperatorSubtaskState operatorSubtaskState = OperatorSubtaskState.builder() @@ -574,11 +578,8 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { if (idx == headOpIndex) { Collection<KeyedStateHandle> keyedStateBackend = opState.getManagedKeyedState(); Collection<KeyedStateHandle> keyGroupStateRaw = opState.getRawKeyedState(); - compareKeyedState( - Collections.singletonList(originalKeyedStateBackend), - keyedStateBackend); - compareKeyedState( - Collections.singletonList(originalKeyedStateRaw), keyGroupStateRaw); + compareKeyedState(singletonList(originalKeyedStateBackend), keyedStateBackend); + compareKeyedState(singletonList(originalKeyedStateRaw), keyGroupStateRaw); } } actualOpStatesBackend.add(allParallelManagedOpStates); @@ -1032,13 +1033,122 @@ public class CheckpointCoordinatorRestoringTest extends TestLogger { Collection<KeyedStateHandle> keyedStateBackend = headOpState.getManagedKeyedState(); Collection<KeyedStateHandle> keyGroupStateRaw = headOpState.getRawKeyedState(); - compareKeyedState( - Collections.singletonList(originalKeyedStateBackend), keyedStateBackend); - compareKeyedState(Collections.singletonList(originalKeyedStateRaw), keyGroupStateRaw); + compareKeyedState(singletonList(originalKeyedStateBackend), keyedStateBackend); + compareKeyedState(singletonList(originalKeyedStateRaw), keyGroupStateRaw); } comparePartitionableState( expectedManagedOperatorStates.get(0), actualManagedOperatorStates); comparePartitionableState(expectedRawOperatorStates.get(0), actualRawOperatorStates); } + + @Test + public void testRestoreLatestCheckpointedStateWithoutInFlightData() throws Exception { + // given: Operator with not empty states. + final JobVertexID jobVertexID = new JobVertexID(); + int parallelism1 = 3; + int maxParallelism1 = 42; + + CompletedCheckpointStore completedCheckpointStore = new EmbeddedCompletedCheckpointStore(); + + final ExecutionGraph graph = + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(jobVertexID, parallelism1, maxParallelism1) + .build(); + + final ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID); + + // set up the coordinator and validate the initial state + CheckpointCoordinator coord = + new CheckpointCoordinatorBuilder() + .setExecutionGraph(graph) + .setCompletedCheckpointStore(completedCheckpointStore) + .setCheckpointCoordinatorConfiguration( + new CheckpointCoordinatorConfigurationBuilder() + .setCheckpointIdOfIgnoredInFlightData(1) + .build()) + .setTimer(manuallyTriggeredScheduledExecutor) + .build(); + + // trigger the checkpoint + coord.triggerCheckpoint(false); + manuallyTriggeredScheduledExecutor.triggerAll(); + + assertEquals(1, coord.getPendingCheckpoints().size()); + long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet()); + + List<KeyGroupRange> keyGroupPartitions1 = + StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1); + + Random random = new Random(); + // fill the states and complete the checkpoint. + for (int index = 0; index < jobVertex.getParallelism(); index++) { + OperatorSubtaskState operatorSubtaskState = + OperatorSubtaskState.builder() + .setManagedOperatorState( + generatePartitionableStateHandle( + jobVertexID, index, 2, 8, false)) + .setRawOperatorState( + generatePartitionableStateHandle( + jobVertexID, index, 2, 8, true)) + .setManagedKeyedState( + generateKeyGroupState( + jobVertexID, keyGroupPartitions1.get(index), false)) + .setRawKeyedState( + generateKeyGroupState( + jobVertexID, keyGroupPartitions1.get(index), true)) + .setInputChannelState( + StateObjectCollection.singleton( + createNewInputChannelStateHandle(3, random))) + .setResultSubpartitionState( + StateObjectCollection.singleton( + createNewResultSubpartitionStateHandle(3, random))) + .build(); + TaskStateSnapshot taskOperatorSubtaskStates = new TaskStateSnapshot(); + taskOperatorSubtaskStates.putSubtaskStateByOperatorID( + OperatorID.fromJobVertexID(jobVertexID), operatorSubtaskState); + + AcknowledgeCheckpoint acknowledgeCheckpoint = + new AcknowledgeCheckpoint( + graph.getJobID(), + jobVertex + .getTaskVertices()[index] + .getCurrentExecutionAttempt() + .getAttemptId(), + checkpointId, + new CheckpointMetrics(), + taskOperatorSubtaskStates); + + coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO); + } + + assertEquals(1, coord.getSuccessfulCheckpoints().size()); + + // when: Restore latest checkpoint without in-flight data. + Set<ExecutionJobVertex> tasks = new HashSet<>(); + tasks.add(jobVertex); + assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false)); + + // then: All states should be restored successfully except InputChannel and + // ResultSubpartition which should be ignored. + verifyStateRestore(jobVertexID, jobVertex, keyGroupPartitions1); + for (int i = 0; i < jobVertex.getParallelism(); i++) { + JobManagerTaskRestore taskRestore = + jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore(); + Assert.assertEquals(1L, taskRestore.getRestoreCheckpointId()); + TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot(); + + OperatorSubtaskState operatorState = + stateSnapshot.getSubtaskStateByOperatorID( + OperatorID.fromJobVertexID(jobVertexID)); + + assertTrue(operatorState.getInputChannelState().isEmpty()); + assertTrue(operatorState.getResultSubpartitionState().isEmpty()); + + assertFalse(operatorState.getRawOperatorState().isEmpty()); + assertFalse(operatorState.getManagedOperatorState().isEmpty()); + assertFalse(operatorState.getRawKeyedState().isEmpty()); + assertFalse(operatorState.getManagedOperatorState().isEmpty()); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java index d0de6d1..9950f2c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java @@ -86,6 +86,7 @@ public class CheckpointSettingsSerializableTest extends TestLogger { true, false, false, + 0, 0), new SerializedValue<StateBackend>(new CustomStateBackend(outOfClassPath)), new SerializedValue<CheckpointStorage>( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java index 3c134cb..927b809 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java @@ -60,6 +60,7 @@ public class CheckpointStatsTrackerTest { false, false, false, + 0, 0), null); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index d53be67..50c7dd1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -145,6 +145,7 @@ public class ExecutionGraphCheckpointCoordinatorTest extends TestLogger { true, false, false, + 0, 0); final JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(chkConfig, null); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java index 9396ef0..ee71761 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java @@ -70,6 +70,7 @@ public class FailoverStrategyCheckpointCoordinatorTest extends TestLogger { true, false, false, + 0, 0); CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java index 274e7fd..a764c76 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskStateTest.java @@ -19,16 +19,28 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.InputChannelStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.ResultSubpartitionStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.junit.Test; +import java.io.IOException; +import java.util.Collections; +import java.util.Random; + import static java.util.Arrays.asList; import static java.util.Collections.singletonList; +import static org.apache.commons.lang3.builder.EqualsBuilder.reflectionEquals; +import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateKeyGroupState; +import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generatePartitionableStateHandle; +import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewInputChannelStateHandle; +import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** {@link OperatorSubtaskState} test. */ public class OperatorSubtaskStateTest { @@ -50,6 +62,48 @@ public class OperatorSubtaskStateTest { .discardState(); } + @Test + public void testToBuilderCorrectness() throws IOException { + // given: Initialized operator subtask state. + JobVertexID jobVertexID = new JobVertexID(); + int index = 0; + Random random = new Random(); + + OperatorSubtaskState operatorSubtaskState = + OperatorSubtaskState.builder() + .setManagedOperatorState( + generatePartitionableStateHandle(jobVertexID, index, 2, 8, false)) + .setRawOperatorState( + generatePartitionableStateHandle(jobVertexID, index, 2, 8, true)) + .setManagedKeyedState( + generateKeyGroupState(jobVertexID, new KeyGroupRange(0, 11), false)) + .setRawKeyedState( + generateKeyGroupState(jobVertexID, new KeyGroupRange(0, 9), true)) + .setInputChannelState( + StateObjectCollection.singleton( + createNewInputChannelStateHandle(3, random))) + .setResultSubpartitionState( + StateObjectCollection.singleton( + createNewResultSubpartitionStateHandle(3, random))) + .setInputRescalingDescriptor( + new InflightDataRescalingDescriptor( + new int[1], + new RescaleMappings[0], + Collections.singleton(1))) + .setOutputRescalingDescriptor( + new InflightDataRescalingDescriptor( + new int[1], + new RescaleMappings[0], + Collections.singleton(2))) + .build(); + + // when: Copy the operator subtask state. + OperatorSubtaskState operatorSubtaskStateCopy = operatorSubtaskState.toBuilder().build(); + + // then: It should be equal to original one. + assertTrue(reflectionEquals(operatorSubtaskState, operatorSubtaskStateCopy)); + } + private ResultSubpartitionStateHandle buildSubpartitionHandle( StreamStateHandle delegate, int subPartitionIdx1) { return new ResultSubpartitionStateHandle( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 3292579..073d214 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -101,6 +101,7 @@ public class ArchivedExecutionGraphTest extends TestLogger { true, false, false, + 0, 0); JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings(chkConfig, null); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java index ec722d8..3bbd94f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java @@ -659,6 +659,7 @@ public class DefaultExecutionGraphDeploymentTest extends TestLogger { false, false, false, + 0, 0), null)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java index d779c99..dd4424b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java @@ -398,6 +398,7 @@ public class JobGraphTest extends TestLogger { true, false, false, + 0, 0); return new JobCheckpointingSettings(checkpointCoordinatorConfiguration, null); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java index 3677785..8a7a738 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java @@ -45,6 +45,7 @@ public class JobCheckpointingSettingsTest { false, false, false, + 0, 0), new SerializedValue<>(new MemoryStateBackend())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java index ac5b1cd..80e8652c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java @@ -162,6 +162,7 @@ public class SchedulerTestingUtils { false, false, false, + 0, 0); SerializedValue<StateBackend> serializedStateBackend = null; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index 76abc50..02ee15b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -91,6 +91,9 @@ public class CheckpointConfig implements java.io.Serializable { /** Flag to enable unaligned checkpoints. */ private boolean unalignedCheckpointsEnabled; + /** Id of checkpoint for which in-flight data should be ignored on recovery. */ + private long checkpointIdOfIgnoredInFlightData; + private Duration alignmentTimeout = ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT.defaultValue(); @@ -148,6 +151,8 @@ public class CheckpointConfig implements java.io.Serializable { this.forceCheckpointing = checkpointConfig.forceCheckpointing; this.forceUnalignedCheckpoints = checkpointConfig.forceUnalignedCheckpoints; this.storage = checkpointConfig.getCheckpointStorage(); + this.checkpointIdOfIgnoredInFlightData = + checkpointConfig.getCheckpointIdOfIgnoredInFlightData(); } public CheckpointConfig() {} @@ -668,6 +673,28 @@ public class CheckpointConfig implements java.io.Serializable { return this.storage; } + /** + * Setup the checkpoint id for which the in-flight data will be ignored for all operators in + * case of the recovery from this checkpoint. + * + * @param checkpointIdOfIgnoredInFlightData Checkpoint id for which in-flight data should be + * ignored. + * @see #setCheckpointIdOfIgnoredInFlightData + */ + @PublicEvolving + public void setCheckpointIdOfIgnoredInFlightData(long checkpointIdOfIgnoredInFlightData) { + this.checkpointIdOfIgnoredInFlightData = checkpointIdOfIgnoredInFlightData; + } + + /** + * @return Checkpoint id for which in-flight data should be ignored. + * @see #setCheckpointIdOfIgnoredInFlightData + */ + @PublicEvolving + public long getCheckpointIdOfIgnoredInFlightData() { + return checkpointIdOfIgnoredInFlightData; + } + /** Cleanup behaviour for externalized checkpoints when the job is cancelled. */ @PublicEvolving public enum ExternalizedCheckpointCleanup { @@ -750,6 +777,9 @@ public class CheckpointConfig implements java.io.Serializable { .getOptional(ExecutionCheckpointingOptions.ENABLE_UNALIGNED) .ifPresent(this::enableUnalignedCheckpoints); configuration + .getOptional(ExecutionCheckpointingOptions.CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA) + .ifPresent(this::setCheckpointIdOfIgnoredInFlightData); + configuration .getOptional(ExecutionCheckpointingOptions.ALIGNMENT_TIMEOUT) .ifPresent(this::setAlignmentTimeout); configuration diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java index 7cdd194..40c5a6e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java @@ -196,4 +196,21 @@ public class ExecutionCheckpointingOptions { .text( "Forces unaligned checkpoints, particularly allowing them for iterative jobs.") .build()); + + public static final ConfigOption<Long> CHECKPOINT_ID_OF_IGNORED_IN_FLIGHT_DATA = + ConfigOptions.key("execution.checkpointing.id-of-ignored-in-flight-data") + .longType() + .defaultValue(-1L) + .withDescription( + Description.builder() + .text( + "Checkpoint id for which in-flight data should be ignored in case of the recovery from this checkpoint.") + .linebreak() + .linebreak() + .text( + "It is better to keep this value empty until " + + "there is explicit needs to restore from " + + "the specific checkpoint without in-flight data.") + .linebreak() + .build()); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 425350a..f89641d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -1312,6 +1312,8 @@ public class StreamingJobGraphGenerator { .setTolerableCheckpointFailureNumber( cfg.getTolerableCheckpointFailureNumber()) .setUnalignedCheckpointsEnabled(cfg.isUnalignedCheckpointsEnabled()) + .setCheckpointIdOfIgnoredInFlightData( + cfg.getCheckpointIdOfIgnoredInFlightData()) .setAlignmentTimeout(cfg.getAlignmentTimeout().toMillis()) .build(), serializedStateBackend, diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java index 4820eb0..97c8f01 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java @@ -297,6 +297,7 @@ public class JobMasterStopWithSavepointITCase extends AbstractTestBase { true, false, false, + 0, 0), null); diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java index 39ea862..6cb55a4 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java @@ -108,6 +108,7 @@ public class JobMasterTriggerSavepointITCase extends AbstractTestBase { true, false, false, + 0, 0), null); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java new file mode 100644 index 0000000..a8bdf6c --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IgnoreInFlightDataITCase.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.Iterator; +import java.util.concurrent.locks.LockSupport; + +import static java.util.Collections.singletonList; +import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assert.assertEquals; + +/** Test of ignoring in-flight data during recovery. */ +public class IgnoreInFlightDataITCase extends TestLogger { + @ClassRule + public static final MiniClusterWithClientResource CLUSTER = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(getConfiguration()) + .setNumberTaskManagers(2) + .setNumberSlotsPerTaskManager(2) + .build()); + + private static Configuration getConfiguration() { + Configuration config = new Configuration(); + config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("48m")); + return config; + } + + @Test + public void testIgnoreInFlightDataDuringRecovery() throws Exception { + // given: Stream which will fail after first checkpoint. + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + env.enableCheckpointing(10); + env.getCheckpointConfig().enableUnalignedCheckpoints(); + env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + env.getCheckpointConfig().setCheckpointIdOfIgnoredInFlightData(1); + env.setRestartStrategy(fixedDelayRestart(2, 0)); + + env.addSource(new NumberSource()) + .shuffle() + // map for having parallel execution. + .map(new SlowMap()) + .addSink(new SumFailSink()) + // one sink for easy calculation. + .setParallelism(1); + + // when: Job is executed. + env.execute("Total sum"); + + // Calculate the expected single value after recovery. + int sourceValueAfterRestore = NumberSource.lastCheckpointedValue + 1; + + // Calculate result in case of normal recovery. + long resultWithoutIgnoringData = 0; + for (int i = 0; i <= sourceValueAfterRestore; i++) { + resultWithoutIgnoringData += i; + } + + // then: Actual result should be less than the ideal result because some of data was + // ignored. + assertThat(SumFailSink.result, lessThan(resultWithoutIgnoringData)); + + // and: Actual result should be equal to sum of result before fail + source value after + // recovery. + long expectedResult = SumFailSink.resultBeforeFail + sourceValueAfterRestore; + assertEquals(expectedResult, SumFailSink.result); + } + + private static class SumFailSink implements SinkFunction<Integer>, CheckpointedFunction { + public static long result; + public static long resultBeforeFail; + + @Override + public void invoke(Integer value) throws Exception { + result += value; + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + resultBeforeFail = result; + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + result = resultBeforeFail; + } + } + + private static class NumberSource implements SourceFunction<Integer>, CheckpointedFunction { + + private static final long serialVersionUID = 1L; + private ListState<Integer> valueState; + public static int lastCheckpointedValue; + + @Override + public void run(SourceContext<Integer> ctx) throws Exception { + Iterator<Integer> stateIt = valueState.get().iterator(); + boolean isRecovered = stateIt.hasNext(); + + if (isRecovered) { + Integer lastValue = stateIt.next(); + + // Checking that ListState is recovered correctly. + assertEquals(lastCheckpointedValue, lastValue.intValue()); + + // if it is started after recovery, just send one more value and finish. + ctx.collect(lastValue + 1); + } else { + int next = 0; + while (true) { + synchronized (ctx.getCheckpointLock()) { + next++; + valueState.update(singletonList(next)); + ctx.collect(next); + } + } + } + } + + @Override + public void cancel() {} + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + if (lastCheckpointedValue > 0) { + throw new RuntimeException("Error during snapshot"); + } + + lastCheckpointedValue = valueState.get().iterator().next(); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + this.valueState = + context.getOperatorStateStore() + .getListState(new ListStateDescriptor<>("state", Types.INT)); + } + } + + private static class SlowMap extends RichMapFunction<Integer, Integer> { + + @Override + public Integer map(Integer value) throws Exception { + // slow down the map in order to have more intermediate data. + LockSupport.parkNanos(100000); + return value; + } + } +}
