This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b19cc312ff969559d269a9332ea05e6a48ec6f59 Author: Dawid Wysakowicz <[email protected]> AuthorDate: Thu Dec 9 16:46:56 2021 +0100 [FLINK-25191] Skip savepoints for recovery This closes #18092 --- .../runtime/checkpoint/CheckpointCoordinator.java | 52 +++++--- .../CheckpointCoordinatorFailureTest.java | 6 +- .../checkpoint/CheckpointCoordinatorTest.java | 78 +++--------- .../flink/test/checkpointing/SavepointITCase.java | 132 ++++++++++++--------- .../org.apache.flink.core.fs.FileSystemFactory | 16 +++ 5 files changed, 146 insertions(+), 138 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 8f2ebfd..ecba75d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -1200,14 +1200,17 @@ public class CheckpointCoordinator { */ private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint) throws CheckpointException { - final long checkpointId = pendingCheckpoint.getCheckpointId(); + final long checkpointId = pendingCheckpoint.getCheckpointID(); final CompletedCheckpoint completedCheckpoint; final CompletedCheckpoint lastSubsumed; + final CheckpointProperties props = pendingCheckpoint.getProps(); // As a first step to complete the checkpoint, we register its state with the registry - Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates(); - SharedStateRegistry sharedStateRegistry = completedCheckpointStore.getSharedStateRegistry(); - sharedStateRegistry.registerAll(operatorStates.values()); + // we do not register savepoints' shared state, as Flink is not in charge of savepoints' + // lifecycle + if (!props.isSavepoint()) { + registerSharedStates(pendingCheckpoint); + } try { completedCheckpoint = finalizeCheckpoint(pendingCheckpoint); @@ -1215,33 +1218,46 @@ public class CheckpointCoordinator { // the pending checkpoint must be discarded after the finalization Preconditions.checkState(pendingCheckpoint.isDisposed() && completedCheckpoint != null); - lastSubsumed = - addCompletedCheckpointToStoreAndSubsumeOldest( - checkpointId, - completedCheckpoint, - pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo()); + if (!props.isSavepoint()) { + lastSubsumed = + addCompletedCheckpointToStoreAndSubsumeOldest( + checkpointId, + completedCheckpoint, + pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo()); + } else { + lastSubsumed = null; + } } finally { pendingCheckpoints.remove(checkpointId); scheduleTriggerRequest(); } + // remember recent checkpoint id for debugging purposes rememberRecentCheckpointId(checkpointId); - // drop those pending checkpoints that are at prior to the completed one - dropSubsumedCheckpoints(checkpointId); - // record the time when this was completed, to calculate // the 'min delay between checkpoints' lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis(); logCheckpointInfo(completedCheckpoint); - // send the "notify complete" call to all vertices, coordinators, etc. - sendAcknowledgeMessages( - pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(), - checkpointId, - completedCheckpoint.getTimestamp(), - extractIdIfDiscardedOnSubsumed(lastSubsumed)); + if (!props.isSavepoint() || props.isSynchronous()) { + // drop those pending checkpoints that are at prior to the completed one + dropSubsumedCheckpoints(checkpointId); + + // send the "notify complete" call to all vertices, coordinators, etc. + sendAcknowledgeMessages( + pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(), + checkpointId, + completedCheckpoint.getTimestamp(), + extractIdIfDiscardedOnSubsumed(lastSubsumed)); + } + } + + private void registerSharedStates(PendingCheckpoint pendingCheckpoint) { + Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates(); + SharedStateRegistry sharedStateRegistry = completedCheckpointStore.getSharedStateRegistry(); + sharedStateRegistry.registerAll(operatorStates.values()); } private void logCheckpointInfo(CompletedCheckpoint completedCheckpoint) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 9205286..c02ee16 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -41,9 +41,7 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.Executors; import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import java.util.Collections; import java.util.List; @@ -65,8 +63,6 @@ import static org.mockito.Mockito.when; /** Tests for failure of checkpoint coordinator. */ public class CheckpointCoordinatorFailureTest extends TestLogger { - @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); - /** * Tests that a failure while storing a completed checkpoint in the completed checkpoint store * will properly fail the originating pending checkpoint and clean upt the completed checkpoint. @@ -231,7 +227,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger { .setCompletedCheckpointStore(completedCheckpointStore) .setTimer(manuallyTriggeredScheduledExecutor) .build(); - checkpointCoordinator.triggerSavepoint(tmpFolder.newFolder().getAbsolutePath()); + checkpointCoordinator.triggerCheckpoint(false); manuallyTriggeredScheduledExecutor.triggerAll(); try { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 7cfafb6..4dc4869 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -120,6 +120,7 @@ import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.PERIOD import static org.apache.flink.runtime.checkpoint.CheckpointStoreUtil.INVALID_CHECKPOINT_ID; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -2018,67 +2019,22 @@ public class CheckpointCoordinatorTest extends TestLogger { assertNotNull(savepointFuture.get()); // the now we should have a completed checkpoint - assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()); + // savepoints should not registered as retained checkpoints + assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()); assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints()); // validate that the relevant tasks got a confirmation message for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) { ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId(); assertEquals(checkpointId, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId); + assertThat(gateway.getNotifiedCompletedCheckpoints(attemptId)).isEmpty(); } - // validate that the shared states are registered - { - verify(subtaskState1, times(1)).registerSharedStates(any(SharedStateRegistry.class)); - verify(subtaskState2, times(1)).registerSharedStates(any(SharedStateRegistry.class)); - } - - CompletedCheckpoint success = checkpointCoordinator.getSuccessfulCheckpoints().get(0); + CompletedCheckpoint success = savepointFuture.get(); assertEquals(graph.getJobID(), success.getJobId()); assertEquals(pending.getCheckpointId(), success.getCheckpointID()); assertEquals(2, success.getOperatorStates().size()); - // --------------- - // trigger another checkpoint and see that this one replaces the other checkpoint - // --------------- - gateway.resetCount(); - savepointFuture = checkpointCoordinator.triggerSavepoint(savepointDir); - manuallyTriggeredScheduledExecutor.triggerAll(); - assertFalse(savepointFuture.isDone()); - - long checkpointIdNew = - checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey(); - checkpointCoordinator.receiveAcknowledgeMessage( - new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointIdNew), - TASK_MANAGER_LOCATION_INFO); - checkpointCoordinator.receiveAcknowledgeMessage( - new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointIdNew), - TASK_MANAGER_LOCATION_INFO); - - assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints()); - assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()); - - CompletedCheckpoint successNew = checkpointCoordinator.getSuccessfulCheckpoints().get(0); - assertEquals(graph.getJobID(), successNew.getJobId()); - assertEquals(checkpointIdNew, successNew.getCheckpointID()); - assertEquals(2, successNew.getOperatorStates().size()); - assertTrue(successNew.getOperatorStates().values().stream().allMatch(this::hasNoSubState)); - assertNotNull(savepointFuture.get()); - - // validate that the first savepoint does not discard its private states. - verify(subtaskState1, never()).discardState(); - verify(subtaskState2, never()).discardState(); - - // validate that the relevant tasks got a confirmation message - for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) { - ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId(); - assertEquals( - checkpointIdNew, gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId); - assertEquals( - checkpointIdNew, - gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId); - } - checkpointCoordinator.shutdown(); } @@ -2176,7 +2132,7 @@ public class CheckpointCoordinatorTest extends TestLogger { FutureUtils.throwIfCompletedExceptionally(savepointFuture2); assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints()); - // 2nd savepoint should subsume the last checkpoint, but not the 1st savepoint + // savepoints should not subsume checkpoints checkpointCoordinator.receiveAcknowledgeMessage( new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, savepointId2), TASK_MANAGER_LOCATION_INFO); @@ -2184,13 +2140,12 @@ public class CheckpointCoordinatorTest extends TestLogger { new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, savepointId2), TASK_MANAGER_LOCATION_INFO); - // currently, we do not subsume a checkpoint after a savepoint completed to avoid data lost. - verify(checkpointCoordinator, times(1)) - .sendAcknowledgeMessages( - anyList(), eq(savepointId2), anyLong(), eq(INVALID_CHECKPOINT_ID)); + // we do not send notify checkpoint complete for savepoints + verify(checkpointCoordinator, times(0)) + .sendAcknowledgeMessages(anyList(), eq(savepointId2), anyLong(), anyLong()); - assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints()); - assertEquals(2, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints()); + assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()); assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed()); @@ -2205,13 +2160,12 @@ public class CheckpointCoordinatorTest extends TestLogger { new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, savepointId1), TASK_MANAGER_LOCATION_INFO); - // savepoint should not be subsumed. - verify(checkpointCoordinator, times(1)) - .sendAcknowledgeMessages( - anyList(), eq(savepointId1), anyLong(), eq(INVALID_CHECKPOINT_ID)); + // we do not send notify checkpoint complete for savepoints + verify(checkpointCoordinator, times(0)) + .sendAcknowledgeMessages(anyList(), eq(savepointId1), anyLong(), anyLong()); - assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints()); - assertEquals(2, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()); + assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints()); + assertEquals(1, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()); assertNotNull(savepointFuture1.get()); CompletableFuture<CompletedCheckpoint> checkpointFuture4 = diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java index 547c103..16b0023 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java @@ -36,29 +36,24 @@ import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointException; -import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; -import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; -import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory; -import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory; -import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.messages.FlinkJobNotFoundException; -import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders; @@ -124,7 +119,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.stream.Collectors; @@ -136,6 +130,7 @@ import static org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult; import static org.apache.flink.util.ExceptionUtils.assertThrowable; import static org.apache.flink.util.ExceptionUtils.assertThrowableWithMessage; import static org.apache.flink.util.ExceptionUtils.findThrowable; +import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -827,15 +822,12 @@ public class SavepointITCase extends TestLogger { @Test public void testStopWithSavepointWithDrainGlobalFailoverIfSavepointAborted() throws Exception { - Configuration configuration = new Configuration(); - configuration.setString( - HighAvailabilityOptions.HA_MODE, FailingSyncSavepointHAFactory.class.getName()); final int parallelism = 2; + PathFailingFileSystem.resetFailingPath(savepointDir.getAbsolutePath() + ".*/_metadata"); MiniClusterWithClientResource cluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberSlotsPerTaskManager(parallelism) - .setConfiguration(configuration) .build()); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -855,11 +847,19 @@ public class SavepointITCase extends TestLogger { waitUntilAllTasksAreRunning(cluster.getRestClusterClient(), jobGraph.getJobID()); try { - client.stopWithSavepoint(jobGraph.getJobID(), true, savepointDir.getAbsolutePath()) + client.stopWithSavepoint( + jobGraph.getJobID(), + true, + PathFailingFileSystem.SCHEME + + "://" + + savepointDir.getAbsolutePath()) .get(); fail("The future should fail exceptionally."); - } catch (ExecutionException ignored) { + } catch (ExecutionException ex) { // expected + if (!findThrowableWithMessage(ex, "Expected IO exception").isPresent()) { + throw ex; + } } // make sure that we restart all tasks after the savepoint failure @@ -869,42 +869,6 @@ public class SavepointITCase extends TestLogger { } } - private static class FailingSyncSavepointCompletedCheckpointStore - extends StandaloneCompletedCheckpointStore { - FailingSyncSavepointCompletedCheckpointStore() { - super(1); - } - - @Override - public CompletedCheckpoint addCheckpointAndSubsumeOldestOne( - CompletedCheckpoint checkpoint, - CheckpointsCleaner checkpointsCleaner, - Runnable postCleanup) - throws Exception { - if (checkpoint.getProperties().isSynchronous()) { - throw new ExpectedTestException(); - } else { - return super.addCheckpointAndSubsumeOldestOne( - checkpoint, checkpointsCleaner, postCleanup); - } - } - } - - /** - * A factory for HA services used to inject {@link - * FailingSyncSavepointCompletedCheckpointStore}. - */ - public static class FailingSyncSavepointHAFactory implements HighAvailabilityServicesFactory { - @Override - public HighAvailabilityServices createHAServices( - Configuration configuration, Executor executor) { - final CheckpointRecoveryFactory checkpointRecoveryFactory = - PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery( - maxCheckpoints -> new FailingSyncSavepointCompletedCheckpointStore()); - return new EmbeddedHaServicesWithLeadershipControl(executor, checkpointRecoveryFactory); - } - } - private static BiFunction<JobID, ExecutionException, Boolean> assertAfterSnapshotCreationFailure() { return (jobId, actualException) -> { @@ -1664,4 +1628,66 @@ public class SavepointITCase extends TestLogger { throw new RuntimeException(e); } } + + /** A test file system. It will fail when trying to perform actions on a statically set path. */ + public static class PathFailingFileSystem extends LocalFileSystem { + + public static final String SCHEME = "failPath"; + + private static String failingPathRegex; + + public static void resetFailingPath(String regex) { + failingPathRegex = regex; + } + + @Override + public FSDataInputStream open(org.apache.flink.core.fs.Path f, int bufferSize) + throws IOException { + failPath(f); + return super.open(f, bufferSize); + } + + @Override + public FSDataInputStream open(org.apache.flink.core.fs.Path f) throws IOException { + failPath(f); + return super.open(f); + } + + @Override + public FSDataOutputStream create( + final org.apache.flink.core.fs.Path filePath, final WriteMode overwrite) + throws IOException { + failPath(filePath); + return super.create(filePath, overwrite); + } + + private void failPath(org.apache.flink.core.fs.Path filePath) throws IOException { + if (filePath.getPath().matches(failingPathRegex)) { + throw new IOException("Expected IO exception for path: " + failingPathRegex); + } + } + + @Override + public URI getUri() { + return URI.create(SCHEME + ":///"); + } + } + // ------------------------------------------------------------------------ + + /** + * A factory for {@link + * org.apache.flink.test.checkpointing.SavepointITCase.PathFailingFileSystem}. + */ + public static final class PathFailingFileSystemFactory implements FileSystemFactory { + + @Override + public String getScheme() { + return PathFailingFileSystem.SCHEME; + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + return new PathFailingFileSystem(); + } + } } diff --git a/flink-tests/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-tests/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory new file mode 100644 index 0000000..b615736 --- /dev/null +++ b/flink-tests/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.test.checkpointing.SavepointITCase$PathFailingFileSystemFactory
