[FLINK-5763] [checkpoints] Followup on adding CheckpointOptions

  - Add a test that validates the checkpoint type ordinals are not changed
  - Change target location writing from 'writeUtf' to 'StringUtils.write'.
  - Pull out the coding charset as a constant in 'EventSerializer'
  - Simplify the directory creation in 'SavepointStore'


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df16e50b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df16e50b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df16e50b

Branch: refs/heads/master
Commit: df16e50bbf01d26f75b7745dacd5779ad47dcce5
Parents: 6e7a917
Author: Stephan Ewen <[email protected]>
Authored: Wed Feb 22 14:11:49 2017 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Thu Feb 23 18:39:49 2017 +0100

----------------------------------------------------------------------
 .../flink/core/io/IOReadableWritable.java       |  7 +-
 .../java/org/apache/flink/util/StringUtils.java | 47 +++++++++++--
 .../checkpoint/savepoint/SavepointStore.java    | 70 ++++++++++----------
 .../io/network/api/CheckpointBarrier.java       | 16 +++--
 .../api/serialization/EventSerializer.java      | 17 ++---
 .../runtime/util/DataInputDeserializer.java     |  6 +-
 .../runtime/checkpoint/CheckpointTypeTest.java  | 42 ++++++++++++
 .../io/network/api/CheckpointBarrierTest.java   |  2 +-
 8 files changed, 145 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java 
b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
index a192a21..a38952e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java
@@ -27,9 +27,10 @@ import org.apache.flink.core.memory.DataOutputView;
 /**
  * This interface must be implemented by every class whose objects have to be 
serialized to their binary representation
  * and vice-versa. In particular, records have to implement this interface in 
order to specify how their data can be
- * transfered
- * to a binary representation.
- * When implementing this Interface make sure that the implementing class has 
a default (zero-argument) constructor!
+ * transferred to a binary representation.
+ * 
+ * <p>When implementing this Interface make sure that the implementing class 
has a default
+ * (zero-argument) constructor!
  */
 @Public
 public interface IOReadableWritable {

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index 3c32d77..fc945c6 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -27,6 +27,11 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.types.StringValue;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Utility class to convert objects into strings in vice-versa.
  */
@@ -302,19 +307,46 @@ public final class StringUtils {
                }
                return new String(data);
        }
-       
+
+       /**
+        * Writes a String to the given output.
+        * The written string can be read with {@link 
#readNullableString(DataInputView)}.
+        *
+        * @param str The string to write
+        * @param out The output to write to
+        *   
+        * @throws IOException Thrown, if the writing or the serialization 
fails.
+        */
+       public static void writeString(@Nonnull String str, DataOutputView out) 
throws IOException {
+               checkNotNull(str);
+               StringValue.writeString(str, out);
+       }
+
+       /**
+        * Reads a non-null String from the given input.
+        *
+        * @param in The input to read from
+        * @return The deserialized String
+        * 
+        * @throws IOException Thrown, if the reading or the deserialization 
fails.
+        */
+       public static String readString(DataInputView in) throws IOException {
+               return StringValue.readString(in);
+       }
+
        /**
         * Writes a String to the given output. The string may be null.
         * The written string can be read with {@link 
#readNullableString(DataInputView)}-
         * 
         * @param str The string to write, or null.
         * @param out The output to write to.
-        * @throws IOException Throws if the writing or the serialization fails.
+        * 
+        * @throws IOException Thrown, if the writing or the serialization 
fails.
         */
-       public static void writeNullableString(String str, DataOutputView out) 
throws IOException {
+       public static void writeNullableString(@Nullable String str, 
DataOutputView out) throws IOException {
                if (str != null) {
                        out.writeBoolean(true);
-                       StringValue.writeString(str, out);
+                       writeString(str, out);
                } else {
                        out.writeBoolean(false);
                }
@@ -326,11 +358,12 @@ public final class StringUtils {
         * 
         * @param in The input to read from.
         * @return The deserialized string, or null.
-        * @throws IOException Throws if the reading or the deserialization 
fails.
+        * 
+        * @throws IOException Thrown, if the reading or the deserialization 
fails.
         */
-       public static String readNullableString(DataInputView in) throws 
IOException {
+       public static @Nullable String readNullableString(DataInputView in) 
throws IOException {
                if (in.readBoolean()) {
-                       return StringValue.readString(in);
+                       return readString(in);
                } else {
                        return null;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 0caf5b2..95370a5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -18,24 +18,27 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Utilities for storing and loading savepoint meta data files.
  *
@@ -65,7 +68,10 @@ public class SavepointStore {
         * @throws IOException FileSystem operation failures are forwarded
         */
        public static String createSavepointDirectory(@Nonnull String 
baseDirectory, @Nullable JobID jobId) throws IOException {
-               String prefix;
+               final Path basePath = new Path(baseDirectory);
+               final FileSystem fs = basePath.getFileSystem();
+
+               final String prefix;
                if (jobId == null) {
                        prefix = "savepoint-";
                } else {
@@ -73,33 +79,21 @@ public class SavepointStore {
                }
 
                Exception latestException = null;
-               Path savepointDirectory = null;
-
-               FileSystem fs = null;
 
                // Try to create a FS output stream
                for (int attempt = 0; attempt < 10; attempt++) {
-                       Path path = new Path(baseDirectory, 
FileUtils.getRandomFilename(prefix));
-
-                       if (fs == null) {
-                               fs = FileSystem.get(path.toUri());
-                       }
+                       Path path = new Path(basePath, 
FileUtils.getRandomFilename(prefix));
 
                        try {
                                if (fs.mkdirs(path)) {
-                                       savepointDirectory = path;
-                                       break;
+                                       return path.toString();
                                }
                        } catch (Exception e) {
                                latestException = e;
                        }
                }
 
-               if (savepointDirectory == null) {
-                       throw new IOException("Failed to create savepoint 
directory at " + baseDirectory, latestException);
-               } else {
-                       return savepointDirectory.getPath();
-               }
+               throw new IOException("Failed to create savepoint directory at 
" + baseDirectory, latestException);
        }
 
        /**
@@ -121,20 +115,22 @@ public class SavepointStore {
         * @param directory Target directory to store savepoint in
         * @param savepoint Savepoint to be stored
         * @return Path of stored savepoint
-        * @throws Exception Failures during store are forwarded
+        * @throws IOException Failures during store are forwarded
         */
        public static <T extends Savepoint> String storeSavepoint(String 
directory, T savepoint) throws IOException {
                checkNotNull(directory, "Target directory");
                checkNotNull(savepoint, "Savepoint");
 
-               Path basePath = new Path(directory);
-               FileSystem fs = FileSystem.get(basePath.toUri());
+               final Path basePath = new Path(directory);
+               final Path metadataFilePath = new Path(basePath, 
META_DATA_FILE);
 
-               Path path = new Path(basePath, META_DATA_FILE);
-               FSDataOutputStream fdos = fs.create(path, false);
+               final FileSystem fs = FileSystem.get(basePath.toUri());
 
                boolean success = false;
-               try (DataOutputStream dos = new DataOutputStream(fdos)) {
+               try (FSDataOutputStream fdos = fs.create(metadataFilePath, 
WriteMode.NO_OVERWRITE); 
+                               DataOutputStream dos = new 
DataOutputStream(fdos))
+               {
+
                        // Write header
                        dos.writeInt(MAGIC_NUMBER);
                        dos.writeInt(savepoint.getVersion());
@@ -143,14 +139,18 @@ public class SavepointStore {
                        SavepointSerializer<T> serializer = 
SavepointSerializers.getSerializer(savepoint);
                        serializer.serialize(savepoint, dos);
                        success = true;
-               } finally {
-                       if (!success && fs.exists(path)) {
-                               if (!fs.delete(path, true)) {
-                                       LOG.warn("Failed to delete file {} 
after failed write.", path);
+               }
+               finally {
+                       if (!success && fs.exists(metadataFilePath)) {
+                               if (!fs.delete(metadataFilePath, true)) {
+                                       LOG.warn("Failed to delete file {} 
after failed metadata write.", metadataFilePath);
                                }
                        }
                }
 
+               // we return the savepoint directory path here!
+               // The directory path also works to resume from and is more 
elegant than the direct
+               // metadata file pointer
                return basePath.toString();
        }
 
@@ -159,7 +159,7 @@ public class SavepointStore {
         *
         * @param savepointFileOrDirectory Path to the parent savepoint 
directory or the meta data file.
         * @return The loaded savepoint
-        * @throws Exception Failures during load are forwared
+        * @throws IOException Failures during load are forwarded
         */
        public static Savepoint loadSavepoint(String savepointFileOrDirectory, 
ClassLoader userClassLoader) throws IOException {
                Preconditions.checkNotNull(savepointFileOrDirectory, "Path");
@@ -207,7 +207,7 @@ public class SavepointStore {
         * Removes the savepoint meta data w/o loading and disposing it.
         *
         * @param path Path of savepoint to remove
-        * @throws Exception Failures during disposal are forwarded
+        * @throws IOException Failures during disposal are forwarded
         */
        public static void removeSavepointFile(String path) throws IOException {
                Preconditions.checkNotNull(path, "Path");

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 0752897..a42c25d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.api;
 
 import static org.apache.flink.util.Preconditions.checkElementIndex;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 
 import java.io.IOException;
 
@@ -28,6 +29,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.event.RuntimeEvent;
+import org.apache.flink.util.StringUtils;
 
 /**
  * Checkpoint barriers are used to align checkpoints throughout the streaming 
topology. The
@@ -37,12 +39,12 @@ import org.apache.flink.runtime.event.RuntimeEvent;
  * 
  * <p>Once an operator has received a checkpoint barrier from all its input 
channels, it
  * knows that a certain checkpoint is complete. It can trigger the operator 
specific checkpoint
- * behavior and broadcast the barrier to downstream operators.</p>
+ * behavior and broadcast the barrier to downstream operators.
  * 
  * <p>Depending on the semantic guarantees, may hold off post-checkpoint data 
until the checkpoint
- * is complete (exactly once)</p>
+ * is complete (exactly once).
  * 
- * <p>The checkpoint barrier IDs are strictly monotonous increasing.</p>
+ * <p>The checkpoint barrier IDs are strictly monotonous increasing.
  */
 public class CheckpointBarrier extends RuntimeEvent {
 
@@ -86,8 +88,8 @@ public class CheckpointBarrier extends RuntimeEvent {
                        return;
                } else if (checkpointType == CheckpointType.SAVEPOINT) {
                        String targetLocation = 
checkpointOptions.getTargetLocation();
-                       assert(targetLocation != null);
-                       out.writeUTF(targetLocation);
+                       checkState(targetLocation != null);
+                       StringUtils.writeString(targetLocation, out);
                } else {
                        throw new IOException("Unknown CheckpointType " + 
checkpointType);
                }
@@ -99,13 +101,13 @@ public class CheckpointBarrier extends RuntimeEvent {
                timestamp = in.readLong();
 
                int typeOrdinal = in.readInt();
-               checkElementIndex(typeOrdinal, CheckpointType.values().length, 
"Unknown CheckpointType ordinal " + typeOrdinal);
+               checkElementIndex(typeOrdinal, CheckpointType.values().length, 
"Unknown CheckpointType ordinal");
                CheckpointType checkpointType = 
CheckpointType.values()[typeOrdinal];
 
                if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
                        checkpointOptions = 
CheckpointOptions.forFullCheckpoint();
                } else if (checkpointType == CheckpointType.SAVEPOINT) {
-                       String targetLocation = in.readUTF();
+                       String targetLocation = StringUtils.readString(in);
                        checkpointOptions = 
CheckpointOptions.forSavepoint(targetLocation);
                } else {
                        throw new IOException("Illegal CheckpointType " + 
checkpointType);

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 223cbfe..3adf864 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -44,6 +44,8 @@ import org.apache.flink.util.Preconditions;
  */
 public class EventSerializer {
 
+       private static final Charset STRING_CODING_CHARSET = 
Charset.forName("UTF-8");
+
        private static final int END_OF_PARTITION_EVENT = 0;
 
        private static final int CHECKPOINT_BARRIER_EVENT = 1;
@@ -77,16 +79,16 @@ public class EventSerializer {
                        } else if (checkpointType == CheckpointType.SAVEPOINT) {
                                String targetLocation = 
checkpointOptions.getTargetLocation();
                                assert(targetLocation != null);
-                               byte[] bytes = 
targetLocation.getBytes(Charset.forName("UTF-8"));
+                               byte[] locationBytes = 
targetLocation.getBytes(STRING_CODING_CHARSET);
 
-                               buf = ByteBuffer.allocate(24 + 4 + 
bytes.length);
+                               buf = ByteBuffer.allocate(24 + 4 + 
locationBytes.length);
                                buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
                                buf.putLong(4, barrier.getId());
                                buf.putLong(12, barrier.getTimestamp());
                                buf.putInt(20, checkpointType.ordinal());
-                               buf.putInt(24, bytes.length);
-                               for (int i = 0; i < bytes.length; i++) {
-                                       buf.put(28 + i, bytes[i]);
+                               buf.putInt(24, locationBytes.length);
+                               for (int i = 0; i < locationBytes.length; i++) {
+                                       buf.put(28 + i, locationBytes[i]);
                                }
                        } else {
                                throw new IOException("Unknown checkpoint type: 
" + checkpointType);
@@ -204,8 +206,7 @@ public class EventSerializer {
                                CheckpointOptions checkpointOptions;
 
                                int checkpointTypeOrdinal = buffer.getInt();
-                               Preconditions.checkElementIndex(type, 
CheckpointType.values().length,
-                                       "Illegal CheckpointType ordinal " + 
checkpointTypeOrdinal);
+                               Preconditions.checkElementIndex(type, 
CheckpointType.values().length, "Illegal CheckpointType ordinal");
                                CheckpointType checkpointType = 
CheckpointType.values()[checkpointTypeOrdinal];
 
                                if (checkpointType == 
CheckpointType.FULL_CHECKPOINT) {
@@ -214,7 +215,7 @@ public class EventSerializer {
                                        int len = buffer.getInt();
                                        byte[] bytes = new byte[len];
                                        buffer.get(bytes);
-                                       String targetLocation = new 
String(bytes, Charset.forName("UTF-8"));
+                                       String targetLocation = new 
String(bytes, STRING_CODING_CHARSET);
 
                                        checkpointOptions = 
CheckpointOptions.forSavepoint(targetLocation);
                                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
index 0f99496..4e8871a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
@@ -45,7 +45,11 @@ public class DataInputDeserializer implements DataInputView, 
java.io.Serializabl
        // 
------------------------------------------------------------------------
        
        public DataInputDeserializer() {}
-       
+
+       public DataInputDeserializer(byte[] buffer) {
+               setBuffer(buffer, 0, buffer.length);
+       }
+
        public DataInputDeserializer(byte[] buffer, int start, int len) {
                setBuffer(buffer, start, len);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
new file mode 100644
index 0000000..dfbde5e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CheckpointTypeTest {
+
+       /**
+        * This test validates that the order of enumeration constants is not 
changed, because the
+        * ordinal of that enum is used in serialization.
+        *
+        * <p>It is still possible to edit both the ordinal and this test, but 
the test adds
+        * a level of safety, and should make developers stumble over this when 
attempting
+        * to adjust the enumeration.
+        */
+       @Test
+       public void testOrdinalsAreConstant() {
+               assertEquals(0, CheckpointType.FULL_CHECKPOINT.ordinal());
+               assertEquals(1, CheckpointType.SAVEPOINT.ordinal());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
index dd5b0b6..ad9fc16 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java
@@ -49,7 +49,7 @@ public class CheckpointBarrierTest {
                DataOutputSerializer out = new DataOutputSerializer(1024);
                barrier.write(out);
 
-               DataInputDeserializer in = new 
DataInputDeserializer(out.wrapAsByteBuffer());
+               DataInputDeserializer in = new 
DataInputDeserializer(out.getCopyOfBuffer());
                CheckpointBarrier deserialized = new CheckpointBarrier();
                deserialized.read(in);
 

Reply via email to