[
https://issues.apache.org/jira/browse/FLINK-4067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361320#comment-15361320
]
ASF GitHub Bot commented on FLINK-4067:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69458362
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A file system based {@link SavepointStore}.
+ *
+ * <p>Stored savepoints have the following format:
+ * <pre>
+ * MagicNumber SavepointVersion Savepoint
+ * - MagicNumber => int
+ * - SavepointVersion => int (returned by Savepoint#getVersion())
+ * - Savepoint => bytes (serialized via version-specific
SavepointSerializer)
+ * </pre>
+ */
+public class FsSavepointStore implements SavepointStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FsSavepointStore.class);
+
+ /** Magic number for sanity checks against stored savepoints. */
+ int MAGIC_NUMBER = 0x4960672d;
+
+ /** Root path for savepoints. */
+ private final Path rootPath;
+
+ /** Prefix for savepoint files. */
+ private final String prefix;
+
+ /** File system to use for file access. */
+ private final FileSystem fileSystem;
+
+ /**
+ * Creates a new file system based {@link SavepointStore}.
+ *
+ * @param rootPath Root path for savepoints
+ * @param prefix Prefix for savepoint files
+ * @throws IOException On failure to access root path
+ */
+ FsSavepointStore(String rootPath, String prefix) throws IOException {
+ this.rootPath = new Path(checkNotNull(rootPath, "Root path"));
+ this.prefix = checkNotNull(prefix, "Prefix");
+
+ this.fileSystem = FileSystem.get(this.rootPath.toUri());
+ }
+
+ @Override
+ public <T extends Savepoint> String storeSavepoint(T savepoint) throws
IOException {
+ Preconditions.checkNotNull(savepoint, "Savepoint");
+
+ Exception latestException = null;
+ Path path = null;
+ FSDataOutputStream fdos = null;
+
+ // Try to create a FS output stream
+ for (int attempt = 0; attempt < 10; attempt++) {
--- End diff --
What's usually the reason why `fileSystem.create` can fail? Why did you
choose 9 retries?
> Add version header to savepoints
> --------------------------------
>
> Key: FLINK-4067
> URL: https://issues.apache.org/jira/browse/FLINK-4067
> Project: Flink
> Issue Type: Improvement
> Affects Versions: 1.0.3
> Reporter: Ufuk Celebi
> Assignee: Ufuk Celebi
> Fix For: 1.1.0
>
>
> Adding a header with version information to savepoints ensures that we can
> migrate savepoints between Flink versions in the future (for example when
> changing internal serialization formats between versions).
> After talking with Till, we propose to add the following meta data:
> - Magic number (int): identify data as savepoint
> - Version (int): savepoint version (independent of Flink version)
> - Data Offset (int): specifies at which point the actual savepoint data
> starts. With this, we can allow future Flink versions to add fields to the
> header without breaking stuff, e.g. Flink 1.1 could read savepoints of Flink
> 2.0.
> For Flink 1.0 savepoint support, we have to try reading the savepoints
> without a header before failing if we don't find the magic number.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)