[
https://issues.apache.org/jira/browse/FLINK-4067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15364298#comment-15364298
]
ASF GitHub Bot commented on FLINK-4067:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69728549
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
---
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StateForTask;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Matchers;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+public class FsSavepointStoreTest {
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ /**
+ * Tests a store-load-dispose sequence.
+ */
+ @Test
+ public void testStoreLoadDispose() throws Exception {
+ FsSavepointStore store = new
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+ assertEquals(0, tmp.getRoot().listFiles().length);
+
+ // Store
+ SavepointV1 stored = new SavepointV1(1929292,
SavepointV1Test.createTaskStates(4, 24));
+ String path = store.storeSavepoint(stored);
+ assertEquals(1, tmp.getRoot().listFiles().length);
+
+ // Load
+ Savepoint loaded = store.loadSavepoint(path);
+ assertEquals(stored, loaded);
+
+ // Dispose
+ store.disposeSavepoint(path,
ClassLoader.getSystemClassLoader());
+
+ assertEquals(0, tmp.getRoot().listFiles().length);
+ }
+
+ /**
+ * Tests loading of a Flink 1.0 savepoint.
+ */
+ @Test
+ public void testLoadFlink10Savepoint() throws Exception {
+ // Copied from Flink 1.0
+ CompletedCheckpoint checkpoint =
SavepointV0Test.createCompletedCheckpoint(
+ new JobID(),
+ 10210230,
+ System.currentTimeMillis(),
+ System.currentTimeMillis(),
+ 32);
+
+ // Copied from Flink 1.0 savepoint serialization code path
+ Path filePath = new Path(tmp.getRoot().getPath(),
FileUtils.getRandomFilename("fs-savepoint-store-test-"));
+ FSDataOutputStream outStream =
FileSystem.get(filePath.toUri()).create(filePath, false);
+ try (ObjectOutputStream os = new ObjectOutputStream(outStream))
{
+ os.writeObject(checkpoint);
+ }
+
+ // Load with savepoint store
+ FsSavepointStore store = new
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+ SavepointV0 savepoint = (SavepointV0)
store.loadSavepoint(filePath.toString());
+
+ // Verify all expected task states
+ for (StateForTask stateForTask : checkpoint.getStates()) {
+ JobVertexID expectedJobVertexId =
stateForTask.getOperatorId();
+ int subTaskIndex = stateForTask.getSubtask();
+
+ boolean foundMatch = false;
+ for (TaskState taskState : savepoint.getTaskStates()) {
+ if
(taskState.getJobVertexID().equals(expectedJobVertexId)) {
+ SubtaskState subtaskState =
taskState.getState(subTaskIndex);
+ assertNotNull(subtaskState);
+ assertEquals(stateForTask.getState(),
subtaskState.getState());
+ foundMatch = true;
+ break;
+ }
+ }
+
+ assertTrue("Did not find TaskState for " +
stateForTask, foundMatch);
+ }
+ }
+
+ /**
+ * Tests loading with unexpected file types.
+ */
+ @Test
+ public void testUnexpectedFileType() throws Exception {
+ FsSavepointStore store = new
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+
+ // Random file
+ Path filePath = new Path(tmp.getRoot().getPath(),
UUID.randomUUID().toString());
+ FSDataOutputStream fdos =
FileSystem.get(filePath.toUri()).create(filePath, false);
+ DataOutputStream dos = new DataOutputStream(fdos);
+ for (int i = 0; i < 10; i++) {
+ dos.writeLong(ThreadLocalRandom.current().nextLong());
+ }
+
+ try {
+ store.loadSavepoint(filePath.toString());
+ fail("Did not throw expected Exception");
+ } catch (IOException e) {
+ // Unexpected magic number
+ assertTrue(e.getCause() instanceof
IllegalStateException);
+ }
+
+ // Very short file (EOFException)
+ filePath = new Path(tmp.getRoot().getPath(),
UUID.randomUUID().toString());
+ fdos = FileSystem.get(filePath.toUri()).create(filePath, false);
+ dos = new DataOutputStream(fdos);
+
+ dos.writeByte(0);
+
+ try {
+ store.loadSavepoint(filePath.toString());
+ fail("Did not throw expected Exception");
+ } catch (IOException e) {
+ // Unexpected magic number
+ assertTrue(e.getCause() instanceof EOFException);
+ }
+ }
+
+ /**
+ * Tests addition of a new savepoint version.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testMultipleSavepointVersions() throws Exception {
+ Field field =
SavepointSerializers.class.getDeclaredField("SERIALIZERS");
+ field.setAccessible(true);
+ Map<Integer, SavepointSerializer<?>> serializers =
(Map<Integer, SavepointSerializer<?>>) field.get(null);
+
+ assertTrue(serializers.size() >= 1);
+
+ FsSavepointStore store = new
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+ assertEquals(0, tmp.getRoot().listFiles().length);
+
+ // New savepoint type for test
+ int version = ThreadLocalRandom.current().nextInt();
+ long checkpointId = ThreadLocalRandom.current().nextLong();
+
+ // Add serializer
+ serializers.put(version, NewSavepointSerializer.INSTANCE);
+
+ TestSavepoint newSavepoint = new TestSavepoint(version,
checkpointId);
+ String pathNewSavepoint = store.storeSavepoint(newSavepoint);
+ assertEquals(1, tmp.getRoot().listFiles().length);
+
+ // Savepoint v0
--- End diff --
True
> Add version header to savepoints
> --------------------------------
>
> Key: FLINK-4067
> URL: https://issues.apache.org/jira/browse/FLINK-4067
> Project: Flink
> Issue Type: Improvement
> Affects Versions: 1.0.3
> Reporter: Ufuk Celebi
> Assignee: Ufuk Celebi
> Fix For: 1.1.0
>
>
> Adding a header with version information to savepoints ensures that we can
> migrate savepoints between Flink versions in the future (for example when
> changing internal serialization formats between versions).
> After talking with Till, we propose to add the following meta data:
> - Magic number (int): identify data as savepoint
> - Version (int): savepoint version (independent of Flink version)
> - Data Offset (int): specifies at which point the actual savepoint data
> starts. With this, we can allow future Flink versions to add fields to the
> header without breaking stuff, e.g. Flink 1.1 could read savepoints of Flink
> 2.0.
> For Flink 1.0 savepoint support, we have to try reading the savepoints
> without a header before failing if we don't find the magic number.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)