[
https://issues.apache.org/jira/browse/FLINK-4067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15361352#comment-15361352
]
ASF GitHub Bot commented on FLINK-4067:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69461561
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
---
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A file system based {@link SavepointStore}.
+ *
+ * <p>Stored savepoints have the following format:
+ * <pre>
+ * MagicNumber SavepointVersion Savepoint
+ * - MagicNumber => int
+ * - SavepointVersion => int (returned by Savepoint#getVersion())
+ * - Savepoint => bytes (serialized via version-specific
SavepointSerializer)
+ * </pre>
+ */
+public class FsSavepointStore implements SavepointStore {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FsSavepointStore.class);
+
+ /** Magic number for sanity checks against stored savepoints. */
+ int MAGIC_NUMBER = 0x4960672d;
+
+ /** Root path for savepoints. */
+ private final Path rootPath;
+
+ /** Prefix for savepoint files. */
+ private final String prefix;
+
+ /** File system to use for file access. */
+ private final FileSystem fileSystem;
+
+ /**
+ * Creates a new file system based {@link SavepointStore}.
+ *
+ * @param rootPath Root path for savepoints
+ * @param prefix Prefix for savepoint files
+ * @throws IOException On failure to access root path
+ */
+ FsSavepointStore(String rootPath, String prefix) throws IOException {
+ this.rootPath = new Path(checkNotNull(rootPath, "Root path"));
+ this.prefix = checkNotNull(prefix, "Prefix");
+
+ this.fileSystem = FileSystem.get(this.rootPath.toUri());
+ }
+
+ @Override
+ public <T extends Savepoint> String storeSavepoint(T savepoint) throws
IOException {
+ Preconditions.checkNotNull(savepoint, "Savepoint");
+
+ Exception latestException = null;
+ Path path = null;
+ FSDataOutputStream fdos = null;
+
+ // Try to create a FS output stream
+ for (int attempt = 0; attempt < 10; attempt++) {
+ path = new Path(rootPath,
FileUtils.getRandomFilename(prefix));
+ try {
+ fdos = fileSystem.create(path, false);
+ break;
+ } catch (Exception e) {
+ latestException = e;
+ }
+ }
+
+ if (fdos == null) {
+ throw new IOException("Failed to create file output
stream at " + path, latestException);
+ }
+
+ boolean success = false;
+ try (DataOutputStream dos = new DataOutputStream(fdos)) {
+ // Write header
+ dos.writeInt(MAGIC_NUMBER);
+ dos.writeInt(savepoint.getVersion());
+
+ // Write savepoint
+ SavepointSerializer<T> serializer =
SavepointSerializers.getSerializer(savepoint);
+ serializer.serialize(savepoint, fdos);
+ success = true;
+ } finally {
+ fdos.close();
+
+ if (!success && fileSystem.exists(path)) {
+ if (!fileSystem.delete(path, true)) {
+ LOG.warn("Failed to delete file " +
path + " after failed write.");
+ }
+ }
+ }
+
+ return path.toString();
+ }
+
+ @Override
+ public Savepoint loadSavepoint(String path) throws IOException {
+ Preconditions.checkNotNull(path, "Path");
+
+ try {
+ try (FSDataInputStream fdis = createFsInputStream(new
Path(path))) {
+ try (DataInputStream dis = new
DataInputStream(fdis)) {
+ int magicNumber = dis.readInt();
+ int version = dis.readInt();
+
+ if (magicNumber == MAGIC_NUMBER) {
+ SavepointSerializer<?>
serializer = SavepointSerializers.getSerializer(version);
+ Savepoint savepoint =
serializer.deserialize(fdis);
+ return savepoint;
+ } else {
+ throw new
IllegalStateException("Unexpected magic number. This indicates " +
+ "that the
specified file is not a proper savepoint or " +
+ "the file has
been corrupted.");
+ }
+ }
+ }
+ } catch (Throwable t) {
+ // Flink 1.0 did not store a header, check if it is an
old savepoint.
+ // Only after this fails as well we can be sure that it
is an actual
+ // failure.
+ Savepoint savepoint = tryLoadSavepointV0(path);
+ if (savepoint == null) {
+ throw new IOException("Failed to load savepoint
" + path + ".", t);
+ } else {
+ return savepoint;
+ }
+ }
+ }
+
+ @Override
+ public void disposeSavepoint(String path, ClassLoader classLoader)
throws Exception {
+ Preconditions.checkNotNull(path, "Path");
+ Preconditions.checkNotNull(classLoader, "Class loader");
+
+ try {
+ Savepoint savepoint = loadSavepoint(path);
+ savepoint.dispose(classLoader);
+
+ Path filePath = new Path(path);
+
+ if (fileSystem.exists(filePath)) {
+ if (!fileSystem.delete(filePath, true)) {
+ throw new IOException("Failed to delete
" + filePath + ".");
+ }
+ } else {
+ throw new IllegalArgumentException("Invalid
path '" + filePath.toUri() + "'.");
+ }
+ } catch (Throwable t) {
+ throw new IOException("Failed to dispose savepoint " +
path + ".", t);
+ }
+ }
+
+ @Override
+ public void shutDown() throws Exception {
+ // Nothing to do, because the savepoint life-cycle is
independent of
+ // the cluster life-cycle.
+ }
+
+ private FSDataInputStream createFsInputStream(Path path) throws
IOException {
+ if (fileSystem.exists(path)) {
+ return fileSystem.open(path);
+ } else {
+ throw new IllegalArgumentException("Invalid path '" +
path.toUri() + "'.");
+ }
+ }
+
+ /**
+ * Tries to load a Flink 1.0 savepoint, which was not stored with a
header.
+ *
+ * <p>This can be removed for Flink 1.2.
--- End diff --
Why can we remove this for Flink 1.2? Wouldn't that mean that Flink 1.2 can
only restore savepoints from version 1.1 onwards? I think it should also be
possible to restore savepoints from version 1.0 with version 1.2.
> Add version header to savepoints
> --------------------------------
>
> Key: FLINK-4067
> URL: https://issues.apache.org/jira/browse/FLINK-4067
> Project: Flink
> Issue Type: Improvement
> Affects Versions: 1.0.3
> Reporter: Ufuk Celebi
> Assignee: Ufuk Celebi
> Fix For: 1.1.0
>
>
> Adding a header with version information to savepoints ensures that we can
> migrate savepoints between Flink versions in the future (for example when
> changing internal serialization formats between versions).
> After talking with Till, we propose to add the following meta data:
> - Magic number (int): identify data as savepoint
> - Version (int): savepoint version (independent of Flink version)
> - Data Offset (int): specifies at which point the actual savepoint data
> starts. With this, we can allow future Flink versions to add fields to the
> header without breaking stuff, e.g. Flink 1.1 could read savepoints of Flink
> 2.0.
> For Flink 1.0 savepoint support, we have to try reading the savepoints
> without a header before failing if we don't find the magic number.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)