This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 164a59a  [FLINK-23180] Do not initialize checkpoint base locations 
when checkpointing is disabled
164a59a is described below

commit 164a59ac1bb4dd39e6532478c30234eeafd76cd0
Author: Jiayi Liao <[email protected]>
AuthorDate: Fri Sep 17 10:05:36 2021 +0800

    [FLINK-23180] Do not initialize checkpoint base locations when 
checkpointing is disabled
    
    This fix #17151.
    
    Co-authored-by: Jiayi Liao <[email protected]>
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  5 ++-
 .../state/CheckpointStorageCoordinatorView.java    |  7 ++--
 .../filesystem/FsCheckpointStorageAccess.java      |  2 +-
 .../MemoryBackendCheckpointStorageAccess.java      |  2 +-
 .../checkpoint/CheckpointCoordinatorTest.java      | 29 ++++++++++++++
 .../state/StateBackendMigrationTestBase.java       |  2 +-
 ...tingCheckpointStorageAccessCoordinatorView.java |  2 +-
 ...bstractFileCheckpointStorageAccessTestBase.java |  6 +--
 .../filesystem/FsCheckpointStorageAccessTest.java  |  2 +-
 .../filesystem/FsStateBackendEntropyTest.java      |  2 +-
 .../state/testutils/BackendForTestStream.java      |  2 +-
 .../state/ttl/mock/MockCheckpointStorage.java      |  2 +-
 .../state/NonCheckpointingStorageAccess.java       |  2 +-
 .../flink/test/checkpointing/SavepointITCase.java  | 46 ++++++++++++++++++++++
 14 files changed, 94 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 47575ea..7ccfce7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -323,7 +323,10 @@ public class CheckpointCoordinator {
 
         try {
             this.checkpointStorageView = 
checkpointStorage.createCheckpointStorage(job);
-            checkpointStorageView.initializeBaseLocations();
+
+            if (isPeriodicCheckpointingConfigured()) {
+                checkpointStorageView.initializeBaseLocationsForCheckpoint();
+            }
         } catch (IOException e) {
             throw new FlinkRuntimeException(
                     "Failed to create checkpoint storage at checkpoint 
coordinator side.", e);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageCoordinatorView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageCoordinatorView.java
index fb7d30d..131dda1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageCoordinatorView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageCoordinatorView.java
@@ -57,17 +57,16 @@ public interface CheckpointStorageCoordinatorView {
     CompletedCheckpointStorageLocation resolveCheckpoint(String 
externalPointer) throws IOException;
 
     /**
-     * Initializes the necessary prerequisites for storage locations of 
checkpoints/savepoints.
+     * Initializes the necessary prerequisites for storage locations of 
checkpoints.
      *
      * <p>For file-based checkpoint storage, this method would initialize 
essential base checkpoint
      * directories on checkpoint coordinator side and should be executed 
before calling {@link
-     * #initializeLocationForCheckpoint(long)} and {@link 
#initializeLocationForSavepoint(long,
-     * String)}.
+     * #initializeLocationForCheckpoint(long)}.
      *
      * @throws IOException Thrown, if these base storage locations cannot be 
initialized due to an
      *     I/O exception.
      */
-    void initializeBaseLocations() throws IOException;
+    void initializeBaseLocationsForCheckpoint() throws IOException;
 
     /**
      * Initializes a storage location for new checkpoint with the given ID.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
index f1c3278..9a7f7c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
@@ -109,7 +109,7 @@ public class FsCheckpointStorageAccess extends 
AbstractFsCheckpointStorageAccess
     }
 
     @Override
-    public void initializeBaseLocations() throws IOException {
+    public void initializeBaseLocationsForCheckpoint() throws IOException {
         fileSystem.mkdirs(sharedStateDirectory);
         fileSystem.mkdirs(taskOwnedStateDirectory);
         baseLocationsInitialized = true;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
index eca481b..5ff029d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
@@ -112,7 +112,7 @@ public class MemoryBackendCheckpointStorageAccess extends 
AbstractFsCheckpointSt
     }
 
     @Override
-    public void initializeBaseLocations() {
+    public void initializeBaseLocationsForCheckpoint() {
         // since 'checkpointDir' which under 'checkpointsDirectory' would be 
created when calling
         // #initializeLocationForCheckpoint, we could also avoid to call 
mkdirs for the
         // 'checkpointsDirectory' here.
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 933069a..36530b0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -58,6 +59,7 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import 
org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAccess;
 import 
org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
@@ -82,6 +84,7 @@ import org.mockito.verification.VerificationMode;
 
 import javax.annotation.Nullable;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -3607,6 +3610,32 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
         assertEquals(completedCheckpointId, reportedCheckpointId.get());
     }
 
+    @Test
+    public void testBaseLocationsNotInitialized() throws Exception {
+        File checkpointDir = tmpFolder.newFolder();
+        JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID)
+                        .setTransitToRunning(false)
+                        .build();
+        CheckpointCoordinator checkpointCoordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setExecutionGraph(graph)
+                        .setCheckpointCoordinatorConfiguration(
+                                CheckpointCoordinatorConfiguration.builder()
+                                        .setCheckpointInterval(Long.MAX_VALUE)
+                                        .build())
+                        .setCheckpointStorage(new 
FsStateBackend(checkpointDir.toURI()))
+                        .build();
+        Path jobCheckpointPath =
+                new Path(checkpointDir.getAbsolutePath(), 
graph.getJobID().toString());
+        FileSystem fs = FileSystem.get(checkpointDir.toURI());
+
+        // directory will not be created if checkpointing is disabled
+        Assert.assertFalse(fs.exists(jobCheckpointPath));
+    }
+
     private CheckpointCoordinator getCheckpointCoordinator(ExecutionGraph 
graph) throws Exception {
         return new CheckpointCoordinatorBuilder()
                 .setExecutionGraph(graph)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
index 9960bac..bc0b122 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
@@ -1258,7 +1258,7 @@ public abstract class StateBackendMigrationTestBase<B 
extends AbstractStateBacke
         if (checkpointStorageLocation == null) {
             CheckpointStorageAccess checkpointStorageAccess =
                     getCheckpointStorage().createCheckpointStorage(new 
JobID());
-            checkpointStorageAccess.initializeBaseLocations();
+            checkpointStorageAccess.initializeBaseLocationsForCheckpoint();
             checkpointStorageLocation = 
checkpointStorageAccess.initializeLocationForCheckpoint(1L);
         }
         return checkpointStorageLocation;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
index d83609b..ff674b9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
@@ -72,7 +72,7 @@ public class TestingCheckpointStorageAccessCoordinatorView
     }
 
     @Override
-    public void initializeBaseLocations() throws IOException {}
+    public void initializeBaseLocationsForCheckpoint() throws IOException {}
 
     @Override
     public CheckpointStorageLocation initializeLocationForCheckpoint(long 
checkpointId)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.java
index 3a17042..8134baf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.java
@@ -165,9 +165,9 @@ public abstract class 
AbstractFileCheckpointStorageAccessTestBase {
         final long checkpointId = 177;
 
         final CheckpointStorageAccess storage1 = 
createCheckpointStorage(checkpointDir);
-        storage1.initializeBaseLocations();
+        storage1.initializeBaseLocationsForCheckpoint();
         final CheckpointStorageAccess storage2 = 
createCheckpointStorage(checkpointDir);
-        storage2.initializeBaseLocations();
+        storage2.initializeBaseLocationsForCheckpoint();
 
         final CheckpointStorageLocation loc1 =
                 storage1.initializeLocationForCheckpoint(checkpointId);
@@ -221,7 +221,7 @@ public abstract class 
AbstractFileCheckpointStorageAccessTestBase {
         final long checkpointId = 177;
 
         final CheckpointStorageAccess storage = 
createCheckpointStorage(randomTempPath());
-        storage.initializeBaseLocations();
+        storage.initializeBaseLocationsForCheckpoint();
         final CheckpointStorageLocation loc = 
storage.initializeLocationForCheckpoint(checkpointId);
 
         // write to the metadata file for the checkpoint
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
index 6b2b73c..77cb9b6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
@@ -230,7 +230,7 @@ public class FsCheckpointStorageAccessTest extends 
AbstractFileCheckpointStorage
         assertFalse(baseDir.exists());
 
         // mkdirs would only be called when initializeBaseLocations
-        storage.initializeBaseLocations();
+        storage.initializeBaseLocationsForCheckpoint();
         assertTrue(baseDir.exists());
 
         // mkdir would not be called when resolveCheckpointStorageLocation
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
index b7b6424..a31a09c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
@@ -61,7 +61,7 @@ public class FsStateBackendEntropyTest {
         final FsCheckpointStorageAccess storage =
                 new FsCheckpointStorageAccess(
                         fs, checkpointDir, null, new JobID(), 
fileSizeThreshold, 4096);
-        storage.initializeBaseLocations();
+        storage.initializeBaseLocationsForCheckpoint();
 
         final FsCheckpointStorageLocation location =
                 (FsCheckpointStorageLocation) 
storage.initializeLocationForCheckpoint(96562);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
index 4cf3464..9c57d0f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
@@ -89,7 +89,7 @@ public class BackendForTestStream extends MemoryStateBackend {
         }
 
         @Override
-        public void initializeBaseLocations() {
+        public void initializeBaseLocationsForCheckpoint() {
             throw new UnsupportedOperationException();
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
index da661d2..a71d9bc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
@@ -55,7 +55,7 @@ public class MockCheckpointStorage implements 
CheckpointStorage {
             }
 
             @Override
-            public void initializeBaseLocations() {}
+            public void initializeBaseLocationsForCheckpoint() {}
 
             @Override
             public CheckpointStorageLocation 
initializeLocationForCheckpoint(long checkpointId) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
index 973325e..0de86e1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
@@ -45,7 +45,7 @@ class NonCheckpointingStorageAccess implements 
CheckpointStorageAccess {
     }
 
     @Override
-    public void initializeBaseLocations() {}
+    public void initializeBaseLocationsForCheckpoint() {}
 
     @Override
     public CheckpointStorageLocation initializeLocationForCheckpoint(long 
checkpointId) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 160fea2..42077d8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -115,6 +115,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
@@ -135,6 +136,7 @@ import static 
org.apache.flink.util.ExceptionUtils.assertThrowableWithMessage;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -515,6 +517,50 @@ public class SavepointITCase extends TestLogger {
         }
     }
 
+    @Test
+    public void testTriggerSavepointWithoutCheckpointBaseLocations() throws 
Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getCheckpointConfig().disableCheckpointing();
+        env.setParallelism(1);
+
+        env.addSource(new IntegerStreamSource()).addSink(new 
DiscardingSink<>());
+
+        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+        Configuration config = getFileBasedCheckpointsConfig();
+        config.addAll(jobGraph.getJobConfiguration());
+
+        MiniClusterWithClientResource cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(config)
+                                .setNumberTaskManagers(1)
+                                .setNumberSlotsPerTaskManager(1)
+                                .build());
+        cluster.before();
+        ClusterClient<?> client = cluster.getClusterClient();
+
+        String savepointPath = null;
+        try {
+            client.submitJob(jobGraph).get();
+
+            waitForAllTaskRunning(cluster.getMiniCluster(), 
jobGraph.getJobID(), false);
+
+            savepointPath = client.triggerSavepoint(jobGraph.getJobID(), 
null).get();
+
+            assertNotNull(savepointPath);
+
+            client.cancel(jobGraph.getJobID()).get();
+            // checkpoint directory should not be initialized
+            assertEquals(0, 
Objects.requireNonNull(checkpointDir.listFiles()).length);
+        } finally {
+            if (null != savepointPath) {
+                client.disposeSavepoint(savepointPath);
+            }
+            cluster.after();
+        }
+    }
+
     static class BoundedPassThroughOperator<T> extends 
AbstractStreamOperator<T>
             implements OneInputStreamOperator<T, T>, BoundedOneInput {
         static volatile CountDownLatch progressLatch;

Reply via email to