[ 
https://issues.apache.org/jira/browse/FLINK-4067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15364299#comment-15364299
 ] 

ASF GitHub Bot commented on FLINK-4067:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2194#discussion_r69728577
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
 ---
    @@ -0,0 +1,308 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.checkpoint.savepoint;
    +
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.core.fs.FSDataOutputStream;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
    +import org.apache.flink.runtime.checkpoint.StateForTask;
    +import org.apache.flink.runtime.checkpoint.SubtaskState;
    +import org.apache.flink.runtime.checkpoint.TaskState;
    +import org.apache.flink.runtime.jobgraph.JobVertexID;
    +import org.apache.flink.util.FileUtils;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +import org.mockito.Matchers;
    +
    +import java.io.DataInputStream;
    +import java.io.DataOutputStream;
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.OutputStream;
    +import java.lang.reflect.Field;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.ThreadLocalRandom;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertTrue;
    +import static org.junit.Assert.fail;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.doThrow;
    +import static org.mockito.Mockito.mock;
    +
    +public class FsSavepointStoreTest {
    +
    +   @Rule
    +   public TemporaryFolder tmp = new TemporaryFolder();
    +
    +   /**
    +    * Tests a store-load-dispose sequence.
    +    */
    +   @Test
    +   public void testStoreLoadDispose() throws Exception {
    +           FsSavepointStore store = new 
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
    +           assertEquals(0, tmp.getRoot().listFiles().length);
    +
    +           // Store
    +           SavepointV1 stored = new SavepointV1(1929292, 
SavepointV1Test.createTaskStates(4, 24));
    +           String path = store.storeSavepoint(stored);
    +           assertEquals(1, tmp.getRoot().listFiles().length);
    +
    +           // Load
    +           Savepoint loaded = store.loadSavepoint(path);
    +           assertEquals(stored, loaded);
    +
    +           // Dispose
    +           store.disposeSavepoint(path, 
ClassLoader.getSystemClassLoader());
    +
    +           assertEquals(0, tmp.getRoot().listFiles().length);
    +   }
    +
    +   /**
    +    * Tests loading of a Flink 1.0 savepoint.
    +    */
    +   @Test
    +   public void testLoadFlink10Savepoint() throws Exception {
    +           // Copied from Flink 1.0
    +           CompletedCheckpoint checkpoint = 
SavepointV0Test.createCompletedCheckpoint(
    +                           new JobID(),
    +                           10210230,
    +                           System.currentTimeMillis(),
    +                           System.currentTimeMillis(),
    +                           32);
    +
    +           // Copied from Flink 1.0 savepoint serialization code path
    +           Path filePath = new Path(tmp.getRoot().getPath(), 
FileUtils.getRandomFilename("fs-savepoint-store-test-"));
    +           FSDataOutputStream outStream = 
FileSystem.get(filePath.toUri()).create(filePath, false);
    +           try (ObjectOutputStream os = new ObjectOutputStream(outStream)) 
{
    +                   os.writeObject(checkpoint);
    +           }
    +
    +           // Load with savepoint store
    +           FsSavepointStore store = new 
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
    +           SavepointV0 savepoint = (SavepointV0) 
store.loadSavepoint(filePath.toString());
    +
    +           // Verify all expected task states
    +           for (StateForTask stateForTask : checkpoint.getStates()) {
    +                   JobVertexID expectedJobVertexId = 
stateForTask.getOperatorId();
    +                   int subTaskIndex = stateForTask.getSubtask();
    +
    +                   boolean foundMatch = false;
    +                   for (TaskState taskState : savepoint.getTaskStates()) {
    +                           if 
(taskState.getJobVertexID().equals(expectedJobVertexId)) {
    +                                   SubtaskState subtaskState = 
taskState.getState(subTaskIndex);
    +                                   assertNotNull(subtaskState);
    +                                   assertEquals(stateForTask.getState(), 
subtaskState.getState());
    +                                   foundMatch = true;
    +                                   break;
    +                           }
    +                   }
    +
    +                   assertTrue("Did not find TaskState for " + 
stateForTask, foundMatch);
    +           }
    +   }
    +
    +   /**
    +    * Tests loading with unexpected file types.
    +    */
    +   @Test
    +   public void testUnexpectedFileType() throws Exception {
    +           FsSavepointStore store = new 
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
    +
    +           // Random file
    +           Path filePath = new Path(tmp.getRoot().getPath(), 
UUID.randomUUID().toString());
    +           FSDataOutputStream fdos = 
FileSystem.get(filePath.toUri()).create(filePath, false);
    +           DataOutputStream dos = new DataOutputStream(fdos);
    +           for (int i = 0; i < 10; i++) {
    +                   dos.writeLong(ThreadLocalRandom.current().nextLong());
    +           }
    +
    +           try {
    +                   store.loadSavepoint(filePath.toString());
    +                   fail("Did not throw expected Exception");
    +           } catch (IOException e) {
    +                   // Unexpected magic number
    +                   assertTrue(e.getCause() instanceof 
IllegalStateException);
    +           }
    +
    +           // Very short file (EOFException)
    +           filePath = new Path(tmp.getRoot().getPath(), 
UUID.randomUUID().toString());
    +           fdos = FileSystem.get(filePath.toUri()).create(filePath, false);
    +           dos = new DataOutputStream(fdos);
    +
    +           dos.writeByte(0);
    +
    +           try {
    +                   store.loadSavepoint(filePath.toString());
    +                   fail("Did not throw expected Exception");
    +           } catch (IOException e) {
    +                   // Unexpected magic number
    +                   assertTrue(e.getCause() instanceof EOFException);
    +           }
    +   }
    +
    +   /**
    +    * Tests addition of a new savepoint version.
    +    */
    +   @Test
    +   @SuppressWarnings("unchecked")
    +   public void testMultipleSavepointVersions() throws Exception {
    +           Field field = 
SavepointSerializers.class.getDeclaredField("SERIALIZERS");
    +           field.setAccessible(true);
    +           Map<Integer, SavepointSerializer<?>> serializers = 
(Map<Integer, SavepointSerializer<?>>) field.get(null);
    +
    +           assertTrue(serializers.size() >= 1);
    +
    +           FsSavepointStore store = new 
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
    +           assertEquals(0, tmp.getRoot().listFiles().length);
    +
    +           // New savepoint type for test
    +           int version = ThreadLocalRandom.current().nextInt();
    +           long checkpointId = ThreadLocalRandom.current().nextLong();
    +
    +           // Add serializer
    +           serializers.put(version, NewSavepointSerializer.INSTANCE);
    +
    +           TestSavepoint newSavepoint = new TestSavepoint(version, 
checkpointId);
    +           String pathNewSavepoint = store.storeSavepoint(newSavepoint);
    +           assertEquals(1, tmp.getRoot().listFiles().length);
    +
    +           // Savepoint v0
    +           Savepoint savepoint = new SavepointV1(checkpointId, 
SavepointV1Test.createTaskStates(4, 32));
    +           String pathSavepoint = store.storeSavepoint(savepoint);
    +           assertEquals(2, tmp.getRoot().listFiles().length);
    +
    +           // Load
    +           Savepoint loaded = store.loadSavepoint(pathNewSavepoint);
    +           assertEquals(newSavepoint, loaded);
    +
    +           loaded = store.loadSavepoint(pathSavepoint);
    +           assertEquals(savepoint, loaded);
    +   }
    +
    +   /**
    +    * Tests that an exception during store cleans up any created files.
    --- End diff --
    
    Misleading formulation in the comment, that's exactly what's tested.


> Add version header to savepoints
> --------------------------------
>
>                 Key: FLINK-4067
>                 URL: https://issues.apache.org/jira/browse/FLINK-4067
>             Project: Flink
>          Issue Type: Improvement
>    Affects Versions: 1.0.3
>            Reporter: Ufuk Celebi
>            Assignee: Ufuk Celebi
>             Fix For: 1.1.0
>
>
> Adding a header with version information to savepoints ensures that we can 
> migrate savepoints between Flink versions in the future (for example when 
> changing internal serialization formats between versions).
> After talking with Till, we propose to add the following meta data:
> - Magic number (int): identify data as savepoint
> - Version (int): savepoint version (independent of Flink version)
> - Data Offset (int): specifies at which point the actual savepoint data 
> starts. With this, we can allow future Flink versions to add fields to the 
> header without breaking stuff, e.g. Flink 1.1 could read savepoints of Flink 
> 2.0.
> For Flink 1.0 savepoint support, we have to try reading the savepoints 
> without a header before failing if we don't find the magic number.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to