Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2194#discussion_r69522896
--- Diff:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
---
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.StateForTask;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.FileUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Matchers;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+public class FsSavepointStoreTest {
+
+ @Rule
+ public TemporaryFolder tmp = new TemporaryFolder();
+
+ /**
+ * Tests a store-load-dispose sequence.
+ */
+ @Test
+ public void testStoreLoadDispose() throws Exception {
+ FsSavepointStore store = new
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+ assertEquals(0, tmp.getRoot().listFiles().length);
+
+ // Store
+ SavepointV1 stored = new SavepointV1(1929292,
SavepointV1Test.createTaskStates(4, 24));
+ String path = store.storeSavepoint(stored);
+ assertEquals(1, tmp.getRoot().listFiles().length);
+
+ // Load
+ Savepoint loaded = store.loadSavepoint(path);
+ assertEquals(stored, loaded);
+
+ // Dispose
+ store.disposeSavepoint(path,
ClassLoader.getSystemClassLoader());
+
+ assertEquals(0, tmp.getRoot().listFiles().length);
+ }
+
+ /**
+ * Tests loading of a Flink 1.0 savepoint.
+ */
+ @Test
+ public void testLoadFlink10Savepoint() throws Exception {
+ // Copied from Flink 1.0
+ CompletedCheckpoint checkpoint =
SavepointV0Test.createCompletedCheckpoint(
+ new JobID(),
+ 10210230,
+ System.currentTimeMillis(),
+ System.currentTimeMillis(),
+ 32);
+
+ // Copied from Flink 1.0 savepoint serialization code path
+ Path filePath = new Path(tmp.getRoot().getPath(),
FileUtils.getRandomFilename("fs-savepoint-store-test-"));
+ FSDataOutputStream outStream =
FileSystem.get(filePath.toUri()).create(filePath, false);
+ try (ObjectOutputStream os = new ObjectOutputStream(outStream))
{
+ os.writeObject(checkpoint);
+ }
+
+ // Load with savepoint store
+ FsSavepointStore store = new
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+ SavepointV0 savepoint = (SavepointV0)
store.loadSavepoint(filePath.toString());
+
+ // Verify all expected task states
+ for (StateForTask stateForTask : checkpoint.getStates()) {
+ JobVertexID expectedJobVertexId =
stateForTask.getOperatorId();
+ int subTaskIndex = stateForTask.getSubtask();
+
+ boolean foundMatch = false;
+ for (TaskState taskState : savepoint.getTaskStates()) {
+ if
(taskState.getJobVertexID().equals(expectedJobVertexId)) {
+ SubtaskState subtaskState =
taskState.getState(subTaskIndex);
+ assertNotNull(subtaskState);
+ assertEquals(stateForTask.getState(),
subtaskState.getState());
+ foundMatch = true;
+ break;
+ }
+ }
+
+ assertTrue("Did not find TaskState for " +
stateForTask, foundMatch);
+ }
+ }
+
+ /**
+ * Tests loading with unexpected file types.
+ */
+ @Test
+ public void testUnexpectedFileType() throws Exception {
+ FsSavepointStore store = new
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+
+ // Random file
+ Path filePath = new Path(tmp.getRoot().getPath(),
UUID.randomUUID().toString());
+ FSDataOutputStream fdos =
FileSystem.get(filePath.toUri()).create(filePath, false);
+ DataOutputStream dos = new DataOutputStream(fdos);
+ for (int i = 0; i < 10; i++) {
+ dos.writeLong(ThreadLocalRandom.current().nextLong());
+ }
+
+ try {
+ store.loadSavepoint(filePath.toString());
+ fail("Did not throw expected Exception");
+ } catch (IOException e) {
+ // Unexpected magic number
+ assertTrue(e.getCause() instanceof
IllegalStateException);
+ }
+
+ // Very short file (EOFException)
+ filePath = new Path(tmp.getRoot().getPath(),
UUID.randomUUID().toString());
+ fdos = FileSystem.get(filePath.toUri()).create(filePath, false);
+ dos = new DataOutputStream(fdos);
+
+ dos.writeByte(0);
+
+ try {
+ store.loadSavepoint(filePath.toString());
+ fail("Did not throw expected Exception");
+ } catch (IOException e) {
+ // Unexpected magic number
+ assertTrue(e.getCause() instanceof EOFException);
+ }
+ }
+
+ /**
+ * Tests addition of a new savepoint version.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testMultipleSavepointVersions() throws Exception {
+ Field field =
SavepointSerializers.class.getDeclaredField("SERIALIZERS");
+ field.setAccessible(true);
+ Map<Integer, SavepointSerializer<?>> serializers =
(Map<Integer, SavepointSerializer<?>>) field.get(null);
+
+ assertTrue(serializers.size() >= 1);
+
+ FsSavepointStore store = new
FsSavepointStore(tmp.getRoot().getPath(), "fs-savepoint-store-test-");
+ assertEquals(0, tmp.getRoot().listFiles().length);
+
+ // New savepoint type for test
+ int version = ThreadLocalRandom.current().nextInt();
+ long checkpointId = ThreadLocalRandom.current().nextLong();
+
+ // Add serializer
+ serializers.put(version, NewSavepointSerializer.INSTANCE);
+
+ TestSavepoint newSavepoint = new TestSavepoint(version,
checkpointId);
+ String pathNewSavepoint = store.storeSavepoint(newSavepoint);
+ assertEquals(1, tmp.getRoot().listFiles().length);
+
+ // Savepoint v0
+ Savepoint savepoint = new SavepointV1(checkpointId,
SavepointV1Test.createTaskStates(4, 32));
+ String pathSavepoint = store.storeSavepoint(savepoint);
+ assertEquals(2, tmp.getRoot().listFiles().length);
+
+ // Load
+ Savepoint loaded = store.loadSavepoint(pathNewSavepoint);
+ assertEquals(newSavepoint, loaded);
+
+ loaded = store.loadSavepoint(pathSavepoint);
+ assertEquals(savepoint, loaded);
+ }
+
+ /**
+ * Tests that an exception during store cleans up any created files.
--- End diff --
Shouldn't we check that only the failed savepoint is cleaned up?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---