This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b19cc312ff969559d269a9332ea05e6a48ec6f59
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Thu Dec 9 16:46:56 2021 +0100

    [FLINK-25191] Skip savepoints for recovery
    
    This closes #18092
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  52 +++++---
 .../CheckpointCoordinatorFailureTest.java          |   6 +-
 .../checkpoint/CheckpointCoordinatorTest.java      |  78 +++---------
 .../flink/test/checkpointing/SavepointITCase.java  | 132 ++++++++++++---------
 .../org.apache.flink.core.fs.FileSystemFactory     |  16 +++
 5 files changed, 146 insertions(+), 138 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 8f2ebfd..ecba75d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1200,14 +1200,17 @@ public class CheckpointCoordinator {
      */
     private void completePendingCheckpoint(PendingCheckpoint pendingCheckpoint)
             throws CheckpointException {
-        final long checkpointId = pendingCheckpoint.getCheckpointId();
+        final long checkpointId = pendingCheckpoint.getCheckpointID();
         final CompletedCheckpoint completedCheckpoint;
         final CompletedCheckpoint lastSubsumed;
+        final CheckpointProperties props = pendingCheckpoint.getProps();
 
         // As a first step to complete the checkpoint, we register its state 
with the registry
-        Map<OperatorID, OperatorState> operatorStates = 
pendingCheckpoint.getOperatorStates();
-        SharedStateRegistry sharedStateRegistry = 
completedCheckpointStore.getSharedStateRegistry();
-        sharedStateRegistry.registerAll(operatorStates.values());
+        // we do not register savepoints' shared state, as Flink is not in 
charge of savepoints'
+        // lifecycle
+        if (!props.isSavepoint()) {
+            registerSharedStates(pendingCheckpoint);
+        }
 
         try {
             completedCheckpoint = finalizeCheckpoint(pendingCheckpoint);
@@ -1215,33 +1218,46 @@ public class CheckpointCoordinator {
             // the pending checkpoint must be discarded after the finalization
             Preconditions.checkState(pendingCheckpoint.isDisposed() && 
completedCheckpoint != null);
 
-            lastSubsumed =
-                    addCompletedCheckpointToStoreAndSubsumeOldest(
-                            checkpointId,
-                            completedCheckpoint,
-                            
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo());
+            if (!props.isSavepoint()) {
+                lastSubsumed =
+                        addCompletedCheckpointToStoreAndSubsumeOldest(
+                                checkpointId,
+                                completedCheckpoint,
+                                
pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo());
+            } else {
+                lastSubsumed = null;
+            }
         } finally {
             pendingCheckpoints.remove(checkpointId);
             scheduleTriggerRequest();
         }
 
+        // remember recent checkpoint id for debugging purposes
         rememberRecentCheckpointId(checkpointId);
 
-        // drop those pending checkpoints that are at prior to the completed 
one
-        dropSubsumedCheckpoints(checkpointId);
-
         // record the time when this was completed, to calculate
         // the 'min delay between checkpoints'
         lastCheckpointCompletionRelativeTime = clock.relativeTimeMillis();
 
         logCheckpointInfo(completedCheckpoint);
 
-        // send the "notify complete" call to all vertices, coordinators, etc.
-        sendAcknowledgeMessages(
-                pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
-                checkpointId,
-                completedCheckpoint.getTimestamp(),
-                extractIdIfDiscardedOnSubsumed(lastSubsumed));
+        if (!props.isSavepoint() || props.isSynchronous()) {
+            // drop those pending checkpoints that are at prior to the 
completed one
+            dropSubsumedCheckpoints(checkpointId);
+
+            // send the "notify complete" call to all vertices, coordinators, 
etc.
+            sendAcknowledgeMessages(
+                    pendingCheckpoint.getCheckpointPlan().getTasksToCommitTo(),
+                    checkpointId,
+                    completedCheckpoint.getTimestamp(),
+                    extractIdIfDiscardedOnSubsumed(lastSubsumed));
+        }
+    }
+
+    private void registerSharedStates(PendingCheckpoint pendingCheckpoint) {
+        Map<OperatorID, OperatorState> operatorStates = 
pendingCheckpoint.getOperatorStates();
+        SharedStateRegistry sharedStateRegistry = 
completedCheckpointStore.getSharedStateRegistry();
+        sharedStateRegistry.registerAll(operatorStates.values());
     }
 
     private void logCheckpointInfo(CompletedCheckpoint completedCheckpoint) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 9205286..c02ee16 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -41,9 +41,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
 
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
 import java.util.Collections;
 import java.util.List;
@@ -65,8 +63,6 @@ import static org.mockito.Mockito.when;
 /** Tests for failure of checkpoint coordinator. */
 public class CheckpointCoordinatorFailureTest extends TestLogger {
 
-    @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
-
     /**
      * Tests that a failure while storing a completed checkpoint in the 
completed checkpoint store
      * will properly fail the originating pending checkpoint and clean upt the 
completed checkpoint.
@@ -231,7 +227,7 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
                         .setCompletedCheckpointStore(completedCheckpointStore)
                         .setTimer(manuallyTriggeredScheduledExecutor)
                         .build();
-        
checkpointCoordinator.triggerSavepoint(tmpFolder.newFolder().getAbsolutePath());
+        checkpointCoordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
 
         try {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 7cfafb6..4dc4869 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -120,6 +120,7 @@ import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.PERIOD
 import static 
org.apache.flink.runtime.checkpoint.CheckpointStoreUtil.INVALID_CHECKPOINT_ID;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
@@ -2018,67 +2019,22 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
         assertNotNull(savepointFuture.get());
 
         // the now we should have a completed checkpoint
-        assertEquals(1, 
checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        // savepoints should not registered as retained checkpoints
+        assertEquals(0, 
checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
         assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
         // validate that the relevant tasks got a confirmation message
         for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
             ExecutionAttemptID attemptId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
             assertEquals(checkpointId, 
gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
+            
assertThat(gateway.getNotifiedCompletedCheckpoints(attemptId)).isEmpty();
         }
 
-        // validate that the shared states are registered
-        {
-            verify(subtaskState1, 
times(1)).registerSharedStates(any(SharedStateRegistry.class));
-            verify(subtaskState2, 
times(1)).registerSharedStates(any(SharedStateRegistry.class));
-        }
-
-        CompletedCheckpoint success = 
checkpointCoordinator.getSuccessfulCheckpoints().get(0);
+        CompletedCheckpoint success = savepointFuture.get();
         assertEquals(graph.getJobID(), success.getJobId());
         assertEquals(pending.getCheckpointId(), success.getCheckpointID());
         assertEquals(2, success.getOperatorStates().size());
 
-        // ---------------
-        // trigger another checkpoint and see that this one replaces the other 
checkpoint
-        // ---------------
-        gateway.resetCount();
-        savepointFuture = checkpointCoordinator.triggerSavepoint(savepointDir);
-        manuallyTriggeredScheduledExecutor.triggerAll();
-        assertFalse(savepointFuture.isDone());
-
-        long checkpointIdNew =
-                
checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
-        checkpointCoordinator.receiveAcknowledgeMessage(
-                new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, 
checkpointIdNew),
-                TASK_MANAGER_LOCATION_INFO);
-        checkpointCoordinator.receiveAcknowledgeMessage(
-                new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, 
checkpointIdNew),
-                TASK_MANAGER_LOCATION_INFO);
-
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(1, 
checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
-
-        CompletedCheckpoint successNew = 
checkpointCoordinator.getSuccessfulCheckpoints().get(0);
-        assertEquals(graph.getJobID(), successNew.getJobId());
-        assertEquals(checkpointIdNew, successNew.getCheckpointID());
-        assertEquals(2, successNew.getOperatorStates().size());
-        
assertTrue(successNew.getOperatorStates().values().stream().allMatch(this::hasNoSubState));
-        assertNotNull(savepointFuture.get());
-
-        // validate that the first savepoint does not discard its private 
states.
-        verify(subtaskState1, never()).discardState();
-        verify(subtaskState2, never()).discardState();
-
-        // validate that the relevant tasks got a confirmation message
-        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
-            ExecutionAttemptID attemptId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
-            assertEquals(
-                    checkpointIdNew, 
gateway.getOnlyTriggeredCheckpoint(attemptId).checkpointId);
-            assertEquals(
-                    checkpointIdNew,
-                    
gateway.getOnlyNotifiedCompletedCheckpoint(attemptId).checkpointId);
-        }
-
         checkpointCoordinator.shutdown();
     }
 
@@ -2176,7 +2132,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
         FutureUtils.throwIfCompletedExceptionally(savepointFuture2);
         assertEquals(3, checkpointCoordinator.getNumberOfPendingCheckpoints());
 
-        // 2nd savepoint should subsume the last checkpoint, but not the 1st 
savepoint
+        // savepoints should not subsume checkpoints
         checkpointCoordinator.receiveAcknowledgeMessage(
                 new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, 
savepointId2),
                 TASK_MANAGER_LOCATION_INFO);
@@ -2184,13 +2140,12 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                 new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, 
savepointId2),
                 TASK_MANAGER_LOCATION_INFO);
 
-        // currently, we do not subsume a checkpoint after a savepoint 
completed to avoid data lost.
-        verify(checkpointCoordinator, times(1))
-                .sendAcknowledgeMessages(
-                        anyList(), eq(savepointId2), anyLong(), 
eq(INVALID_CHECKPOINT_ID));
+        // we do not send notify checkpoint complete for savepoints
+        verify(checkpointCoordinator, times(0))
+                .sendAcknowledgeMessages(anyList(), eq(savepointId2), 
anyLong(), anyLong());
 
-        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(2, 
checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertEquals(2, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertEquals(1, 
checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
 
         
assertFalse(checkpointCoordinator.getPendingCheckpoints().get(savepointId1).isDisposed());
 
@@ -2205,13 +2160,12 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                 new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, 
savepointId1),
                 TASK_MANAGER_LOCATION_INFO);
 
-        // savepoint should not be subsumed.
-        verify(checkpointCoordinator, times(1))
-                .sendAcknowledgeMessages(
-                        anyList(), eq(savepointId1), anyLong(), 
eq(INVALID_CHECKPOINT_ID));
+        // we do not send notify checkpoint complete for savepoints
+        verify(checkpointCoordinator, times(0))
+                .sendAcknowledgeMessages(anyList(), eq(savepointId1), 
anyLong(), anyLong());
 
-        assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
-        assertEquals(2, 
checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
+        assertEquals(1, checkpointCoordinator.getNumberOfPendingCheckpoints());
+        assertEquals(1, 
checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
         assertNotNull(savepointFuture1.get());
 
         CompletableFuture<CompletedCheckpoint> checkpointFuture4 =
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 547c103..16b0023 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -36,29 +36,24 @@ import 
org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.StateBackendOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
-import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
@@ -124,7 +119,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
@@ -136,6 +130,7 @@ import static 
org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult;
 import static org.apache.flink.util.ExceptionUtils.assertThrowable;
 import static org.apache.flink.util.ExceptionUtils.assertThrowableWithMessage;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
+import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -827,15 +822,12 @@ public class SavepointITCase extends TestLogger {
 
     @Test
     public void 
testStopWithSavepointWithDrainGlobalFailoverIfSavepointAborted() throws 
Exception {
-        Configuration configuration = new Configuration();
-        configuration.setString(
-                HighAvailabilityOptions.HA_MODE, 
FailingSyncSavepointHAFactory.class.getName());
         final int parallelism = 2;
+        PathFailingFileSystem.resetFailingPath(savepointDir.getAbsolutePath() 
+ ".*/_metadata");
         MiniClusterWithClientResource cluster =
                 new MiniClusterWithClientResource(
                         new MiniClusterResourceConfiguration.Builder()
                                 .setNumberSlotsPerTaskManager(parallelism)
-                                .setConfiguration(configuration)
                                 .build());
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -855,11 +847,19 @@ public class SavepointITCase extends TestLogger {
             waitUntilAllTasksAreRunning(cluster.getRestClusterClient(), 
jobGraph.getJobID());
 
             try {
-                client.stopWithSavepoint(jobGraph.getJobID(), true, 
savepointDir.getAbsolutePath())
+                client.stopWithSavepoint(
+                                jobGraph.getJobID(),
+                                true,
+                                PathFailingFileSystem.SCHEME
+                                        + "://"
+                                        + savepointDir.getAbsolutePath())
                         .get();
                 fail("The future should fail exceptionally.");
-            } catch (ExecutionException ignored) {
+            } catch (ExecutionException ex) {
                 // expected
+                if (!findThrowableWithMessage(ex, "Expected IO 
exception").isPresent()) {
+                    throw ex;
+                }
             }
 
             // make sure that we restart all tasks after the savepoint failure
@@ -869,42 +869,6 @@ public class SavepointITCase extends TestLogger {
         }
     }
 
-    private static class FailingSyncSavepointCompletedCheckpointStore
-            extends StandaloneCompletedCheckpointStore {
-        FailingSyncSavepointCompletedCheckpointStore() {
-            super(1);
-        }
-
-        @Override
-        public CompletedCheckpoint addCheckpointAndSubsumeOldestOne(
-                CompletedCheckpoint checkpoint,
-                CheckpointsCleaner checkpointsCleaner,
-                Runnable postCleanup)
-                throws Exception {
-            if (checkpoint.getProperties().isSynchronous()) {
-                throw new ExpectedTestException();
-            } else {
-                return super.addCheckpointAndSubsumeOldestOne(
-                        checkpoint, checkpointsCleaner, postCleanup);
-            }
-        }
-    }
-
-    /**
-     * A factory for HA services used to inject {@link
-     * FailingSyncSavepointCompletedCheckpointStore}.
-     */
-    public static class FailingSyncSavepointHAFactory implements 
HighAvailabilityServicesFactory {
-        @Override
-        public HighAvailabilityServices createHAServices(
-                Configuration configuration, Executor executor) {
-            final CheckpointRecoveryFactory checkpointRecoveryFactory =
-                    
PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(
-                            maxCheckpoints -> new 
FailingSyncSavepointCompletedCheckpointStore());
-            return new EmbeddedHaServicesWithLeadershipControl(executor, 
checkpointRecoveryFactory);
-        }
-    }
-
     private static BiFunction<JobID, ExecutionException, Boolean>
             assertAfterSnapshotCreationFailure() {
         return (jobId, actualException) -> {
@@ -1664,4 +1628,66 @@ public class SavepointITCase extends TestLogger {
             throw new RuntimeException(e);
         }
     }
+
+    /** A test file system. It will fail when trying to perform actions on a 
statically set path. */
+    public static class PathFailingFileSystem extends LocalFileSystem {
+
+        public static final String SCHEME = "failPath";
+
+        private static String failingPathRegex;
+
+        public static void resetFailingPath(String regex) {
+            failingPathRegex = regex;
+        }
+
+        @Override
+        public FSDataInputStream open(org.apache.flink.core.fs.Path f, int 
bufferSize)
+                throws IOException {
+            failPath(f);
+            return super.open(f, bufferSize);
+        }
+
+        @Override
+        public FSDataInputStream open(org.apache.flink.core.fs.Path f) throws 
IOException {
+            failPath(f);
+            return super.open(f);
+        }
+
+        @Override
+        public FSDataOutputStream create(
+                final org.apache.flink.core.fs.Path filePath, final WriteMode 
overwrite)
+                throws IOException {
+            failPath(filePath);
+            return super.create(filePath, overwrite);
+        }
+
+        private void failPath(org.apache.flink.core.fs.Path filePath) throws 
IOException {
+            if (filePath.getPath().matches(failingPathRegex)) {
+                throw new IOException("Expected IO exception for path: " + 
failingPathRegex);
+            }
+        }
+
+        @Override
+        public URI getUri() {
+            return URI.create(SCHEME + ":///");
+        }
+    }
+    // ------------------------------------------------------------------------
+
+    /**
+     * A factory for {@link
+     * 
org.apache.flink.test.checkpointing.SavepointITCase.PathFailingFileSystem}.
+     */
+    public static final class PathFailingFileSystemFactory implements 
FileSystemFactory {
+
+        @Override
+        public String getScheme() {
+            return PathFailingFileSystem.SCHEME;
+        }
+
+        @Override
+        public FileSystem create(URI fsUri) throws IOException {
+            return new PathFailingFileSystem();
+        }
+    }
 }
diff --git 
a/flink-tests/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
 
b/flink-tests/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..b615736
--- /dev/null
+++ 
b/flink-tests/src/test/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.test.checkpointing.SavepointITCase$PathFailingFileSystemFactory

Reply via email to