This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c601c70e5c4 [FLINK-32440][checkpoint] Introduce file merging configuration (#22973) c601c70e5c4 is described below commit c601c70e5c4a9316bee049c613c011b14fab7f5e Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Mon Apr 15 14:21:40 2024 +0800 [FLINK-32440][checkpoint] Introduce file merging configuration (#22973) --- .../generated/checkpoint_file_merging_section.html | 36 +++++++ .../generated/checkpointing_configuration.html | 24 +++++ .../flink/annotation/docs/Documentation.java | 2 + .../flink/configuration/CheckpointingOptions.java | 107 +++++++++++++++++++++ .../FileMergingSnapshotManagerBuilder.java | 4 +- .../runtime/state/CheckpointStorageWorkerView.java | 2 +- .../state/TaskExecutorFileMergingManager.java | 45 ++++++++- .../flink/runtime/taskexecutor/TaskExecutor.java | 5 +- .../state/TaskExecutorFileMergingManagerTest.java | 14 ++- .../flink/streaming/runtime/tasks/StreamTask.java | 25 ++++- 10 files changed, 248 insertions(+), 16 deletions(-) diff --git a/docs/layouts/shortcodes/generated/checkpoint_file_merging_section.html b/docs/layouts/shortcodes/generated/checkpoint_file_merging_section.html new file mode 100644 index 00000000000..50ee139e0ee --- /dev/null +++ b/docs/layouts/shortcodes/generated/checkpoint_file_merging_section.html @@ -0,0 +1,36 @@ +<table class="configuration table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>state.checkpoints.file-merging.enabled</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether to enable merging multiple checkpoint files into one, which will greatly reduce the number of small checkpoint files. This is an experimental feature under evaluation, make sure you're aware of the possible effects of enabling it.</td> + </tr> + <tr> + <td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Only relevant if <code class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is enabled.<br />Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file.</td> + </tr> + <tr> + <td><h5>state.checkpoints.file-merging.max-file-size</h5></td> + <td style="word-wrap: break-word;">32 mb</td> + <td>MemorySize</td> + <td>Max size of a physical file for merged checkpoints.</td> + </tr> + <tr> + <td><h5>state.checkpoints.file-merging.pool-blocking</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether to use Blocking or Non-Blocking pool for merging physical files. A Non-Blocking pool will always provide usable physical file without blocking. It may create many physical files if poll file frequently. When poll a small file from a Blocking pool, it may be blocked until the file is returned.</td> + </tr> + </tbody> +</table> diff --git a/docs/layouts/shortcodes/generated/checkpointing_configuration.html b/docs/layouts/shortcodes/generated/checkpointing_configuration.html index c87a9c33803..3b9fda388e4 100644 --- a/docs/layouts/shortcodes/generated/checkpointing_configuration.html +++ b/docs/layouts/shortcodes/generated/checkpointing_configuration.html @@ -44,6 +44,30 @@ <td>String</td> <td>The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers). If the 'state.checkpoint-storage' is set to 'jobmanager', only the meta data of checkpoints will be stored in this directory.</td> </tr> + <tr> + <td><h5>state.checkpoints.file-merging.across-checkpoint-boundary</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Only relevant if <code class="highlighter-rouge">state.checkpoints.file-merging.enabled</code> is enabled.<br />Whether to allow merging data of multiple checkpoints into one physical file. If this option is set to false, only merge files within checkpoint boundaries. Otherwise, it is possible for the logical files of different checkpoints to share the same physical file.</td> + </tr> + <tr> + <td><h5>state.checkpoints.file-merging.enabled</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether to enable merging multiple checkpoint files into one, which will greatly reduce the number of small checkpoint files. This is an experimental feature under evaluation, make sure you're aware of the possible effects of enabling it.</td> + </tr> + <tr> + <td><h5>state.checkpoints.file-merging.max-file-size</h5></td> + <td style="word-wrap: break-word;">32 mb</td> + <td>MemorySize</td> + <td>Max size of a physical file for merged checkpoints.</td> + </tr> + <tr> + <td><h5>state.checkpoints.file-merging.pool-blocking</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether to use Blocking or Non-Blocking pool for merging physical files. A Non-Blocking pool will always provide usable physical file without blocking. It may create many physical files if poll file frequently. When poll a small file from a Blocking pool, it may be blocked until the file is returned.</td> + </tr> <tr> <td><h5>state.checkpoints.num-retained</h5></td> <td style="word-wrap: break-word;">1</td> diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java index a66975eb38f..348f520710c 100644 --- a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java +++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java @@ -107,6 +107,8 @@ public final class Documentation { public static final String TRACE_REPORTERS = "trace_reporters"; + public static final String CHECKPOINT_FILE_MERGING = "checkpoint_file_merging"; + private Sections() {} } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java index 5626da3d467..bdde9423636 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CheckpointingOptions.java @@ -18,6 +18,7 @@ package org.apache.flink.configuration; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.description.Description; import org.apache.flink.configuration.description.TextElement; @@ -331,4 +332,110 @@ public class CheckpointingOptions { + StateRecoveryOptions.LOCAL_RECOVERY.key() + ". By default, local backup is deactivated. Local backup currently only " + "covers keyed state backends (including both the EmbeddedRocksDBStateBackend and the HashMapStateBackend)."); + + // ------------------------------------------------------------------------ + // Options related to file merging + // ------------------------------------------------------------------------ + + /** + * Whether to enable merging multiple checkpoint files into one, which will greatly reduce the + * number of small checkpoint files. See FLIP-306 for details. + * + * <p>Note: This is an experimental feature under evaluation, make sure you're aware of the + * possible effects of enabling it. + */ + @Experimental + @Documentation.Section(value = Documentation.Sections.CHECKPOINT_FILE_MERGING, position = 1) + public static final ConfigOption<Boolean> FILE_MERGING_ENABLED = + ConfigOptions.key("state.checkpoints.file-merging.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to enable merging multiple checkpoint files into one, which will greatly reduce" + + " the number of small checkpoint files. This is an experimental feature under evaluation, " + + "make sure you're aware of the possible effects of enabling it."); + + /** + * Whether to allow merging data of multiple checkpoints into one physical file. If this option + * is set to false, only merge files within checkpoint boundaries. Otherwise, it is possible for + * the logical files of different checkpoints to share the same physical file. + */ + @Experimental + @Documentation.Section(value = Documentation.Sections.CHECKPOINT_FILE_MERGING, position = 2) + public static final ConfigOption<Boolean> FILE_MERGING_ACROSS_BOUNDARY = + ConfigOptions.key("state.checkpoints.file-merging.across-checkpoint-boundary") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text( + "Only relevant if %s is enabled.", + TextElement.code(FILE_MERGING_ENABLED.key())) + .linebreak() + .text( + "Whether to allow merging data of multiple checkpoints into one physical file. " + + "If this option is set to false, " + + "only merge files within checkpoint boundaries. " + + "Otherwise, it is possible for the logical files of different " + + "checkpoints to share the same physical file.") + .build()); + + /** The max size of a physical file for merged checkpoints. */ + @Experimental + @Documentation.Section(value = Documentation.Sections.CHECKPOINT_FILE_MERGING, position = 3) + public static final ConfigOption<MemorySize> FILE_MERGING_MAX_FILE_SIZE = + ConfigOptions.key("state.checkpoints.file-merging.max-file-size") + .memoryType() + .defaultValue(MemorySize.parse("32MB")) + .withDescription("Max size of a physical file for merged checkpoints."); + + /** + * Whether to use Blocking or Non-Blocking pool for merging physical files. A Non-Blocking pool + * will always provide usable physical file without blocking. It may create many physical files + * if poll file frequently. When poll a small file from a Blocking pool, it may be blocked until + * the file is returned. + */ + @Experimental + @Documentation.Section(value = Documentation.Sections.CHECKPOINT_FILE_MERGING, position = 4) + public static final ConfigOption<Boolean> FILE_MERGING_POOL_BLOCKING = + ConfigOptions.key("state.checkpoints.file-merging.pool-blocking") + .booleanType() + .defaultValue(false) + .withDescription( + "Whether to use Blocking or Non-Blocking pool for merging physical files. " + + "A Non-Blocking pool will always provide usable physical file without blocking. It may create many physical files if poll file frequently. " + + "When poll a small file from a Blocking pool, it may be blocked until the file is returned."); + + /** + * The upper limit of the file pool size based on the number of subtasks within each TM (only + * for merging private state at Task Manager level). + * + * <p>TODO: remove '@Documentation.ExcludeFromDocumentation' after the feature is implemented. + */ + @Experimental @Documentation.ExcludeFromDocumentation + public static final ConfigOption<Integer> FILE_MERGING_MAX_SUBTASKS_PER_FILE = + ConfigOptions.key("state.checkpoints.file-merging.max-subtasks-per-file") + .intType() + .defaultValue(4) + .withDescription( + "The upper limit of the file pool size based on the number of subtasks within each TM" + + "(only for merging private state at Task Manager level)."); + + /** + * Space amplification stands for the magnification of the occupied space compared to the amount + * of valid data. The more space amplification is, the more waste of space will be. This configs + * a space amplification above which a re-uploading for physical files will be triggered to + * reclaim space. + * + * <p>TODO: remove '@Documentation.ExcludeFromDocumentation' after the feature is implemented. + */ + @Experimental @Documentation.ExcludeFromDocumentation + public static final ConfigOption<Float> FILE_MERGING_MAX_SPACE_AMPLIFICATION = + ConfigOptions.key("state.checkpoints.file-merging.max-space-amplification") + .floatType() + .defaultValue(2f) + .withDescription( + "Space amplification stands for the magnification of the occupied space compared to the amount of valid data. " + + "The more space amplification is, the more waste of space will be. This configs a space amplification " + + "above which a re-uploading for physical files will be triggered to reclaim space."); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java index 3a99133bf20..0004a5a9ec0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java @@ -32,10 +32,10 @@ public class FileMergingSnapshotManagerBuilder { /** The file merging type. */ private final FileMergingType fileMergingType; - /** Max size for a file. TODO: Make it configurable. */ + /** Max size for a file. */ private long maxFileSize = 32 * 1024 * 1024; - /** Type of physical file pool. TODO: Make it configurable. */ + /** Type of physical file pool. */ private PhysicalFilePool.Type filePoolType = PhysicalFilePool.Type.NON_BLOCKING; @Nullable private Executor ioExecutor = null; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java index 8817d71fce5..06a2bbeec02 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java @@ -80,7 +80,7 @@ public interface CheckpointStorageWorkerView { /** * Return {@link org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageAccess} if - * file merging is enabled Otherwise, return itself. File merging is supported by subclasses of + * file merging is enabled. Otherwise, return itself. File merging is supported by subclasses of * {@link org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess}. */ default CheckpointStorageWorkerView toFileMergingStorage( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java index 2b48d8a07e7..f2278a43984 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java @@ -17,9 +17,12 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MemorySize; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType; +import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFilePool; import org.apache.flink.util.ShutdownHookUtil; import org.slf4j.Logger; @@ -32,6 +35,11 @@ import javax.annotation.concurrent.GuardedBy; import java.util.HashMap; import java.util.Map; +import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY; +import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED; +import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_MAX_FILE_SIZE; +import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_POOL_BLOCKING; + /** * There is one {@link FileMergingSnapshotManager} for each job per task manager. This class holds * all {@link FileMergingSnapshotManager} objects for a task executor (manager). @@ -67,20 +75,49 @@ public class TaskExecutorFileMergingManager { * org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}. */ public @Nullable FileMergingSnapshotManager fileMergingSnapshotManagerForJob( - @Nonnull JobID jobId) { + @Nonnull JobID jobId, + Configuration clusterConfiguration, + Configuration jobConfiguration) { + boolean mergingEnabled = + jobConfiguration + .getOptional(FILE_MERGING_ENABLED) + .orElse(clusterConfiguration.get(FILE_MERGING_ENABLED)); synchronized (lock) { if (closed) { throw new IllegalStateException( "TaskExecutorFileMergingManager is already closed and cannot " + "register a new FileMergingSnapshotManager."); } + if (!mergingEnabled) { + return null; + } FileMergingSnapshotManager fileMergingSnapshotManager = fileMergingSnapshotManagerByJobId.get(jobId); if (fileMergingSnapshotManager == null) { - // TODO FLINK-32440: choose different FileMergingSnapshotManager by configuration + FileMergingType fileMergingType = + jobConfiguration + .getOptional(FILE_MERGING_ACROSS_BOUNDARY) + .orElse( + clusterConfiguration.get( + FILE_MERGING_ACROSS_BOUNDARY)) + ? FileMergingType.MERGE_ACROSS_CHECKPOINT + : FileMergingType.MERGE_WITHIN_CHECKPOINT; + MemorySize maxFileSize = + jobConfiguration + .getOptional(FILE_MERGING_MAX_FILE_SIZE) + .orElse(clusterConfiguration.get(FILE_MERGING_MAX_FILE_SIZE)); + Boolean usingBlockingPool = + jobConfiguration + .getOptional(FILE_MERGING_POOL_BLOCKING) + .orElse(clusterConfiguration.get(FILE_MERGING_POOL_BLOCKING)); + fileMergingSnapshotManager = - new FileMergingSnapshotManagerBuilder( - jobId.toString(), FileMergingType.MERGE_WITHIN_CHECKPOINT) + new FileMergingSnapshotManagerBuilder(jobId.toString(), fileMergingType) + .setMaxFileSize(maxFileSize.getBytes()) + .setFilePoolType( + usingBlockingPool + ? PhysicalFilePool.Type.BLOCKING + : PhysicalFilePool.Type.NON_BLOCKING) .build(); fileMergingSnapshotManagerByJobId.put(jobId, fileMergingSnapshotManager); LOG.info("Registered new file merging snapshot manager for job {}.", jobId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index f26e9ef7610..9eec765fe6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -761,7 +761,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { jobInformation.getJobConfiguration()); final FileMergingSnapshotManager fileMergingSnapshotManager = - fileMergingManager.fileMergingSnapshotManagerForJob(jobId); + fileMergingManager.fileMergingSnapshotManagerForJob( + jobId, + taskManagerConfiguration.getConfiguration(), + jobInformation.getJobConfiguration()); // TODO: Pass config value from user program and do overriding here. final StateChangelogStorage<?> changelogStorage; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java index ca1364b3e1c..5d77e9fe875 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.state; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey; @@ -27,6 +28,7 @@ import org.junit.jupiter.api.io.TempDir; import java.io.IOException; +import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link TaskExecutorFileMergingManager}. */ @@ -42,8 +44,12 @@ public class TaskExecutorFileMergingManagerTest { Path checkpointDir1 = new Path(testBaseDir.toString(), "job1"); Path checkpointDir2 = new Path(testBaseDir.toString(), "job2"); int writeBufferSize = 4096; + Configuration jobConfig = new Configuration(); + jobConfig.setBoolean(FILE_MERGING_ENABLED, true); + Configuration clusterConfig = new Configuration(); FileMergingSnapshotManager manager1 = - taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob(job1); + taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob( + job1, clusterConfig, jobConfig); manager1.initFileSystem( checkpointDir1.getFileSystem(), checkpointDir1, @@ -51,7 +57,8 @@ public class TaskExecutorFileMergingManagerTest { new Path(checkpointDir1, "taskowned"), writeBufferSize); FileMergingSnapshotManager manager2 = - taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob(job1); + taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob( + job1, clusterConfig, jobConfig); manager2.initFileSystem( checkpointDir1.getFileSystem(), checkpointDir1, @@ -59,7 +66,8 @@ public class TaskExecutorFileMergingManagerTest { new Path(checkpointDir1, "taskowned"), writeBufferSize); FileMergingSnapshotManager manager3 = - taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob(job2); + taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob( + job2, clusterConfig, jobConfig); manager3.initFileSystem( checkpointDir2.getFileSystem(), checkpointDir2, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index d16bc40a8be..988c19a06e4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -470,7 +470,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> CheckpointStorageAccess checkpointStorageAccess = checkpointStorage.createCheckpointStorage(getEnvironment().getJobID()); checkpointStorageAccess = - applyFileMergingCheckpoint( + tryApplyFileMergingCheckpoint( checkpointStorageAccess, environment.getTaskStateManager().getFileMergingSnapshotManager()); environment.setCheckpointStorageAccess(checkpointStorageAccess); @@ -526,11 +526,26 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } } - private CheckpointStorageAccess applyFileMergingCheckpoint( + private CheckpointStorageAccess tryApplyFileMergingCheckpoint( CheckpointStorageAccess checkpointStorageAccess, - FileMergingSnapshotManager fileMergingSnapshotManager) { - // TODO (FLINK-32440): enable FileMergingCheckpoint by configuration - return checkpointStorageAccess; + @Nullable FileMergingSnapshotManager fileMergingSnapshotManager) { + if (fileMergingSnapshotManager == null) { + return checkpointStorageAccess; + } + try { + CheckpointStorageWorkerView mergingCheckpointStorageAccess = + checkpointStorageAccess.toFileMergingStorage( + fileMergingSnapshotManager, environment); + return (CheckpointStorageAccess) mergingCheckpointStorageAccess; + } catch (IOException e) { + LOG.warn( + "Initiating FsMergingCheckpointStorageAccess failed " + + "with exception: {}, falling back to original checkpoint storage access {}.", + e.getMessage(), + checkpointStorageAccess.getClass(), + e); + return checkpointStorageAccess; + } } private TimerService createTimerService(String timerThreadName) {