[
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15565540#comment-15565540
]
ASF GitHub Bot commented on FLINK-4512:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2608#discussion_r82796670
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
---
@@ -18,23 +18,105 @@
package org.apache.flink.runtime.checkpoint.savepoint;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
- * Savepoint store used to persist {@link Savepoint} instances.
+ * A file system based savepoint store.
*
- * <p>The main implementation is the {@link FsSavepointStore}. We also
have the
- * {@link HeapSavepointStore} for historical reasons (introduced in Flink
1.0).
+ * <p>Stored savepoints have the following format:
+ * <pre>
+ * MagicNumber SavepointVersion Savepoint
+ * - MagicNumber => int
+ * - SavepointVersion => int (returned by Savepoint#getVersion())
+ * - Savepoint => bytes (serialized via version-specific
SavepointSerializer)
+ * </pre>
*/
-public interface SavepointStore {
+public class SavepointStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SavepointStore.class);
+
+ /** Magic number for sanity checks against stored savepoints. */
+ private static final int MAGIC_NUMBER = 0x4960672d;
+
+ /** Prefix for savepoint files. */
+ private static final String prefix = "savepoint-";
/**
* Stores the savepoint.
*
+ * @param targetDirectory Target directory to store savepoint in
* @param savepoint Savepoint to be stored
* @param <T> Savepoint type
* @return Path of stored savepoint
* @throws Exception Failures during store are forwarded
*/
- <T extends Savepoint> String storeSavepoint(T savepoint) throws
Exception;
+ public static <T extends Savepoint> String storeSavepoint(
+ String targetDirectory,
+ T savepoint) throws IOException {
+
+ checkNotNull(targetDirectory, "Target directory");
+ checkNotNull(savepoint, "Savepoint");
+
+ Exception latestException = null;
+ Path path = null;
+ FSDataOutputStream fdos = null;
+
+ FileSystem fs = null;
+
+ // Try to create a FS output stream
+ for (int attempt = 0; attempt < 10; attempt++) {
+ path = new Path(targetDirectory,
FileUtils.getRandomFilename(prefix));
+
+ if (fs == null) {
+ fs = FileSystem.get(path.toUri());
+ }
+
+ try {
+ fdos = fs.create(path, false);
+ break;
+ } catch (Exception e) {
+ latestException = e;
+ }
+ }
+
+ if (fdos == null) {
+ throw new IOException("Failed to create file output
stream at " + path, latestException);
+ }
+
+ boolean success = false;
+ try (DataOutputStream dos = new DataOutputStream(fdos)) {
+ // Write header
+ dos.writeInt(MAGIC_NUMBER);
+ dos.writeInt(savepoint.getVersion());
+
+ // Write savepoint
+ SavepointSerializer<T> serializer =
SavepointSerializers.getSerializer(savepoint);
+ serializer.serialize(savepoint, dos);
+ success = true;
+ } finally {
+ if (!success && fs.exists(path)) {
+ if (!fs.delete(path, true)) {
+ LOG.warn("Failed to delete file " +
path + " after failed write.");
--- End diff --
Placeholder {}
> Add option for persistent checkpoints
> -------------------------------------
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Ufuk Celebi
> Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data.
> This is what we currently do for savepoints, but in the future checkpoints
> and savepoints are likely to diverge with respect to guarantees they give for
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints
> in the long term will be that persistent checkpoints can only be restored
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED):
> regular checkpoints are cleaned up in all of these cases whereas persistent
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on
> CANCELLED or FAILED.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)