This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 164a59a [FLINK-23180] Do not initialize checkpoint base locations
when checkpointing is disabled
164a59a is described below
commit 164a59ac1bb4dd39e6532478c30234eeafd76cd0
Author: Jiayi Liao <[email protected]>
AuthorDate: Fri Sep 17 10:05:36 2021 +0800
[FLINK-23180] Do not initialize checkpoint base locations when
checkpointing is disabled
This fix #17151.
Co-authored-by: Jiayi Liao <[email protected]>
---
.../runtime/checkpoint/CheckpointCoordinator.java | 5 ++-
.../state/CheckpointStorageCoordinatorView.java | 7 ++--
.../filesystem/FsCheckpointStorageAccess.java | 2 +-
.../MemoryBackendCheckpointStorageAccess.java | 2 +-
.../checkpoint/CheckpointCoordinatorTest.java | 29 ++++++++++++++
.../state/StateBackendMigrationTestBase.java | 2 +-
...tingCheckpointStorageAccessCoordinatorView.java | 2 +-
...bstractFileCheckpointStorageAccessTestBase.java | 6 +--
.../filesystem/FsCheckpointStorageAccessTest.java | 2 +-
.../filesystem/FsStateBackendEntropyTest.java | 2 +-
.../state/testutils/BackendForTestStream.java | 2 +-
.../state/ttl/mock/MockCheckpointStorage.java | 2 +-
.../state/NonCheckpointingStorageAccess.java | 2 +-
.../flink/test/checkpointing/SavepointITCase.java | 46 ++++++++++++++++++++++
14 files changed, 94 insertions(+), 17 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 47575ea..7ccfce7 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -323,7 +323,10 @@ public class CheckpointCoordinator {
try {
this.checkpointStorageView =
checkpointStorage.createCheckpointStorage(job);
- checkpointStorageView.initializeBaseLocations();
+
+ if (isPeriodicCheckpointingConfigured()) {
+ checkpointStorageView.initializeBaseLocationsForCheckpoint();
+ }
} catch (IOException e) {
throw new FlinkRuntimeException(
"Failed to create checkpoint storage at checkpoint
coordinator side.", e);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageCoordinatorView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageCoordinatorView.java
index fb7d30d..131dda1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageCoordinatorView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageCoordinatorView.java
@@ -57,17 +57,16 @@ public interface CheckpointStorageCoordinatorView {
CompletedCheckpointStorageLocation resolveCheckpoint(String
externalPointer) throws IOException;
/**
- * Initializes the necessary prerequisites for storage locations of
checkpoints/savepoints.
+ * Initializes the necessary prerequisites for storage locations of
checkpoints.
*
* <p>For file-based checkpoint storage, this method would initialize
essential base checkpoint
* directories on checkpoint coordinator side and should be executed
before calling {@link
- * #initializeLocationForCheckpoint(long)} and {@link
#initializeLocationForSavepoint(long,
- * String)}.
+ * #initializeLocationForCheckpoint(long)}.
*
* @throws IOException Thrown, if these base storage locations cannot be
initialized due to an
* I/O exception.
*/
- void initializeBaseLocations() throws IOException;
+ void initializeBaseLocationsForCheckpoint() throws IOException;
/**
* Initializes a storage location for new checkpoint with the given ID.
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
index f1c3278..9a7f7c8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
@@ -109,7 +109,7 @@ public class FsCheckpointStorageAccess extends
AbstractFsCheckpointStorageAccess
}
@Override
- public void initializeBaseLocations() throws IOException {
+ public void initializeBaseLocationsForCheckpoint() throws IOException {
fileSystem.mkdirs(sharedStateDirectory);
fileSystem.mkdirs(taskOwnedStateDirectory);
baseLocationsInitialized = true;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
index eca481b..5ff029d 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorageAccess.java
@@ -112,7 +112,7 @@ public class MemoryBackendCheckpointStorageAccess extends
AbstractFsCheckpointSt
}
@Override
- public void initializeBaseLocations() {
+ public void initializeBaseLocationsForCheckpoint() {
// since 'checkpointDir' which under 'checkpointsDirectory' would be
created when calling
// #initializeLocationForCheckpoint, we could also avoid to call
mkdirs for the
// 'checkpointsDirectory' here.
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 933069a..36530b0 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
@@ -58,6 +59,7 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import
org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAccess;
import
org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
@@ -82,6 +84,7 @@ import org.mockito.verification.VerificationMode;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -3607,6 +3610,32 @@ public class CheckpointCoordinatorTest extends
TestLogger {
assertEquals(completedCheckpointId, reportedCheckpointId.get());
}
+ @Test
+ public void testBaseLocationsNotInitialized() throws Exception {
+ File checkpointDir = tmpFolder.newFolder();
+ JobVertexID jobVertexID = new JobVertexID();
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID)
+ .setTransitToRunning(false)
+ .build();
+ CheckpointCoordinator checkpointCoordinator =
+ new CheckpointCoordinatorBuilder()
+ .setExecutionGraph(graph)
+ .setCheckpointCoordinatorConfiguration(
+ CheckpointCoordinatorConfiguration.builder()
+ .setCheckpointInterval(Long.MAX_VALUE)
+ .build())
+ .setCheckpointStorage(new
FsStateBackend(checkpointDir.toURI()))
+ .build();
+ Path jobCheckpointPath =
+ new Path(checkpointDir.getAbsolutePath(),
graph.getJobID().toString());
+ FileSystem fs = FileSystem.get(checkpointDir.toURI());
+
+ // directory will not be created if checkpointing is disabled
+ Assert.assertFalse(fs.exists(jobCheckpointPath));
+ }
+
private CheckpointCoordinator getCheckpointCoordinator(ExecutionGraph
graph) throws Exception {
return new CheckpointCoordinatorBuilder()
.setExecutionGraph(graph)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
index 9960bac..bc0b122 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendMigrationTestBase.java
@@ -1258,7 +1258,7 @@ public abstract class StateBackendMigrationTestBase<B
extends AbstractStateBacke
if (checkpointStorageLocation == null) {
CheckpointStorageAccess checkpointStorageAccess =
getCheckpointStorage().createCheckpointStorage(new
JobID());
- checkpointStorageAccess.initializeBaseLocations();
+ checkpointStorageAccess.initializeBaseLocationsForCheckpoint();
checkpointStorageLocation =
checkpointStorageAccess.initializeLocationForCheckpoint(1L);
}
return checkpointStorageLocation;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
index d83609b..ff674b9 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestingCheckpointStorageAccessCoordinatorView.java
@@ -72,7 +72,7 @@ public class TestingCheckpointStorageAccessCoordinatorView
}
@Override
- public void initializeBaseLocations() throws IOException {}
+ public void initializeBaseLocationsForCheckpoint() throws IOException {}
@Override
public CheckpointStorageLocation initializeLocationForCheckpoint(long
checkpointId)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.java
index 3a17042..8134baf 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileCheckpointStorageAccessTestBase.java
@@ -165,9 +165,9 @@ public abstract class
AbstractFileCheckpointStorageAccessTestBase {
final long checkpointId = 177;
final CheckpointStorageAccess storage1 =
createCheckpointStorage(checkpointDir);
- storage1.initializeBaseLocations();
+ storage1.initializeBaseLocationsForCheckpoint();
final CheckpointStorageAccess storage2 =
createCheckpointStorage(checkpointDir);
- storage2.initializeBaseLocations();
+ storage2.initializeBaseLocationsForCheckpoint();
final CheckpointStorageLocation loc1 =
storage1.initializeLocationForCheckpoint(checkpointId);
@@ -221,7 +221,7 @@ public abstract class
AbstractFileCheckpointStorageAccessTestBase {
final long checkpointId = 177;
final CheckpointStorageAccess storage =
createCheckpointStorage(randomTempPath());
- storage.initializeBaseLocations();
+ storage.initializeBaseLocationsForCheckpoint();
final CheckpointStorageLocation loc =
storage.initializeLocationForCheckpoint(checkpointId);
// write to the metadata file for the checkpoint
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
index 6b2b73c..77cb9b6 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccessTest.java
@@ -230,7 +230,7 @@ public class FsCheckpointStorageAccessTest extends
AbstractFileCheckpointStorage
assertFalse(baseDir.exists());
// mkdirs would only be called when initializeBaseLocations
- storage.initializeBaseLocations();
+ storage.initializeBaseLocationsForCheckpoint();
assertTrue(baseDir.exists());
// mkdir would not be called when resolveCheckpointStorageLocation
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
index b7b6424..a31a09c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsStateBackendEntropyTest.java
@@ -61,7 +61,7 @@ public class FsStateBackendEntropyTest {
final FsCheckpointStorageAccess storage =
new FsCheckpointStorageAccess(
fs, checkpointDir, null, new JobID(),
fileSizeThreshold, 4096);
- storage.initializeBaseLocations();
+ storage.initializeBaseLocationsForCheckpoint();
final FsCheckpointStorageLocation location =
(FsCheckpointStorageLocation)
storage.initializeLocationForCheckpoint(96562);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
index 4cf3464..9c57d0f 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
@@ -89,7 +89,7 @@ public class BackendForTestStream extends MemoryStateBackend {
}
@Override
- public void initializeBaseLocations() {
+ public void initializeBaseLocationsForCheckpoint() {
throw new UnsupportedOperationException();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
index da661d2..a71d9bc 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockCheckpointStorage.java
@@ -55,7 +55,7 @@ public class MockCheckpointStorage implements
CheckpointStorage {
}
@Override
- public void initializeBaseLocations() {}
+ public void initializeBaseLocationsForCheckpoint() {}
@Override
public CheckpointStorageLocation
initializeLocationForCheckpoint(long checkpointId) {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
index 973325e..0de86e1 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/sorted/state/NonCheckpointingStorageAccess.java
@@ -45,7 +45,7 @@ class NonCheckpointingStorageAccess implements
CheckpointStorageAccess {
}
@Override
- public void initializeBaseLocations() {}
+ public void initializeBaseLocationsForCheckpoint() {}
@Override
public CheckpointStorageLocation initializeLocationForCheckpoint(long
checkpointId) {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 160fea2..42077d8 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -115,6 +115,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
@@ -135,6 +136,7 @@ import static
org.apache.flink.util.ExceptionUtils.assertThrowableWithMessage;
import static org.apache.flink.util.ExceptionUtils.findThrowable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -515,6 +517,50 @@ public class SavepointITCase extends TestLogger {
}
}
+ @Test
+ public void testTriggerSavepointWithoutCheckpointBaseLocations() throws
Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getCheckpointConfig().disableCheckpointing();
+ env.setParallelism(1);
+
+ env.addSource(new IntegerStreamSource()).addSink(new
DiscardingSink<>());
+
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+ Configuration config = getFileBasedCheckpointsConfig();
+ config.addAll(jobGraph.getJobConfiguration());
+
+ MiniClusterWithClientResource cluster =
+ new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(config)
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(1)
+ .build());
+ cluster.before();
+ ClusterClient<?> client = cluster.getClusterClient();
+
+ String savepointPath = null;
+ try {
+ client.submitJob(jobGraph).get();
+
+ waitForAllTaskRunning(cluster.getMiniCluster(),
jobGraph.getJobID(), false);
+
+ savepointPath = client.triggerSavepoint(jobGraph.getJobID(),
null).get();
+
+ assertNotNull(savepointPath);
+
+ client.cancel(jobGraph.getJobID()).get();
+ // checkpoint directory should not be initialized
+ assertEquals(0,
Objects.requireNonNull(checkpointDir.listFiles()).length);
+ } finally {
+ if (null != savepointPath) {
+ client.disposeSavepoint(savepointPath);
+ }
+ cluster.after();
+ }
+ }
+
static class BoundedPassThroughOperator<T> extends
AbstractStreamOperator<T>
implements OneInputStreamOperator<T, T>, BoundedOneInput {
static volatile CountDownLatch progressLatch;