[FLINK-5763] [checkpoints] Followup on adding CheckpointOptions - Add a test that validates the checkpoint type ordinals are not changed - Change target location writing from 'writeUtf' to 'StringUtils.write'. - Pull out the coding charset as a constant in 'EventSerializer' - Simplify the directory creation in 'SavepointStore'
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/df16e50b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/df16e50b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/df16e50b Branch: refs/heads/master Commit: df16e50bbf01d26f75b7745dacd5779ad47dcce5 Parents: 6e7a917 Author: Stephan Ewen <[email protected]> Authored: Wed Feb 22 14:11:49 2017 +0100 Committer: Stephan Ewen <[email protected]> Committed: Thu Feb 23 18:39:49 2017 +0100 ---------------------------------------------------------------------- .../flink/core/io/IOReadableWritable.java | 7 +- .../java/org/apache/flink/util/StringUtils.java | 47 +++++++++++-- .../checkpoint/savepoint/SavepointStore.java | 70 ++++++++++---------- .../io/network/api/CheckpointBarrier.java | 16 +++-- .../api/serialization/EventSerializer.java | 17 ++--- .../runtime/util/DataInputDeserializer.java | 6 +- .../runtime/checkpoint/CheckpointTypeTest.java | 42 ++++++++++++ .../io/network/api/CheckpointBarrierTest.java | 2 +- 8 files changed, 145 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java index a192a21..a38952e 100644 --- a/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java +++ b/flink-core/src/main/java/org/apache/flink/core/io/IOReadableWritable.java @@ -27,9 +27,10 @@ import org.apache.flink.core.memory.DataOutputView; /** * This interface must be implemented by every class whose objects have to be serialized to their binary representation * and vice-versa. In particular, records have to implement this interface in order to specify how their data can be - * transfered - * to a binary representation. - * When implementing this Interface make sure that the implementing class has a default (zero-argument) constructor! + * transferred to a binary representation. + * + * <p>When implementing this Interface make sure that the implementing class has a default + * (zero-argument) constructor! */ @Public public interface IOReadableWritable { http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java index 3c32d77..fc945c6 100644 --- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java @@ -27,6 +27,11 @@ import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.StringValue; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Utility class to convert objects into strings in vice-versa. */ @@ -302,19 +307,46 @@ public final class StringUtils { } return new String(data); } - + + /** + * Writes a String to the given output. + * The written string can be read with {@link #readNullableString(DataInputView)}. + * + * @param str The string to write + * @param out The output to write to + * + * @throws IOException Thrown, if the writing or the serialization fails. + */ + public static void writeString(@Nonnull String str, DataOutputView out) throws IOException { + checkNotNull(str); + StringValue.writeString(str, out); + } + + /** + * Reads a non-null String from the given input. + * + * @param in The input to read from + * @return The deserialized String + * + * @throws IOException Thrown, if the reading or the deserialization fails. + */ + public static String readString(DataInputView in) throws IOException { + return StringValue.readString(in); + } + /** * Writes a String to the given output. The string may be null. * The written string can be read with {@link #readNullableString(DataInputView)}- * * @param str The string to write, or null. * @param out The output to write to. - * @throws IOException Throws if the writing or the serialization fails. + * + * @throws IOException Thrown, if the writing or the serialization fails. */ - public static void writeNullableString(String str, DataOutputView out) throws IOException { + public static void writeNullableString(@Nullable String str, DataOutputView out) throws IOException { if (str != null) { out.writeBoolean(true); - StringValue.writeString(str, out); + writeString(str, out); } else { out.writeBoolean(false); } @@ -326,11 +358,12 @@ public final class StringUtils { * * @param in The input to read from. * @return The deserialized string, or null. - * @throws IOException Throws if the reading or the deserialization fails. + * + * @throws IOException Thrown, if the reading or the deserialization fails. */ - public static String readNullableString(DataInputView in) throws IOException { + public static @Nullable String readNullableString(DataInputView in) throws IOException { if (in.readBoolean()) { - return StringValue.readString(in); + return readString(in); } else { return null; } http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java index 0caf5b2..95370a5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java @@ -18,24 +18,27 @@ package org.apache.flink.runtime.checkpoint.savepoint; -import static org.apache.flink.util.Preconditions.checkNotNull; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Utilities for storing and loading savepoint meta data files. * @@ -65,7 +68,10 @@ public class SavepointStore { * @throws IOException FileSystem operation failures are forwarded */ public static String createSavepointDirectory(@Nonnull String baseDirectory, @Nullable JobID jobId) throws IOException { - String prefix; + final Path basePath = new Path(baseDirectory); + final FileSystem fs = basePath.getFileSystem(); + + final String prefix; if (jobId == null) { prefix = "savepoint-"; } else { @@ -73,33 +79,21 @@ public class SavepointStore { } Exception latestException = null; - Path savepointDirectory = null; - - FileSystem fs = null; // Try to create a FS output stream for (int attempt = 0; attempt < 10; attempt++) { - Path path = new Path(baseDirectory, FileUtils.getRandomFilename(prefix)); - - if (fs == null) { - fs = FileSystem.get(path.toUri()); - } + Path path = new Path(basePath, FileUtils.getRandomFilename(prefix)); try { if (fs.mkdirs(path)) { - savepointDirectory = path; - break; + return path.toString(); } } catch (Exception e) { latestException = e; } } - if (savepointDirectory == null) { - throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException); - } else { - return savepointDirectory.getPath(); - } + throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException); } /** @@ -121,20 +115,22 @@ public class SavepointStore { * @param directory Target directory to store savepoint in * @param savepoint Savepoint to be stored * @return Path of stored savepoint - * @throws Exception Failures during store are forwarded + * @throws IOException Failures during store are forwarded */ public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException { checkNotNull(directory, "Target directory"); checkNotNull(savepoint, "Savepoint"); - Path basePath = new Path(directory); - FileSystem fs = FileSystem.get(basePath.toUri()); + final Path basePath = new Path(directory); + final Path metadataFilePath = new Path(basePath, META_DATA_FILE); - Path path = new Path(basePath, META_DATA_FILE); - FSDataOutputStream fdos = fs.create(path, false); + final FileSystem fs = FileSystem.get(basePath.toUri()); boolean success = false; - try (DataOutputStream dos = new DataOutputStream(fdos)) { + try (FSDataOutputStream fdos = fs.create(metadataFilePath, WriteMode.NO_OVERWRITE); + DataOutputStream dos = new DataOutputStream(fdos)) + { + // Write header dos.writeInt(MAGIC_NUMBER); dos.writeInt(savepoint.getVersion()); @@ -143,14 +139,18 @@ public class SavepointStore { SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(savepoint); serializer.serialize(savepoint, dos); success = true; - } finally { - if (!success && fs.exists(path)) { - if (!fs.delete(path, true)) { - LOG.warn("Failed to delete file {} after failed write.", path); + } + finally { + if (!success && fs.exists(metadataFilePath)) { + if (!fs.delete(metadataFilePath, true)) { + LOG.warn("Failed to delete file {} after failed metadata write.", metadataFilePath); } } } + // we return the savepoint directory path here! + // The directory path also works to resume from and is more elegant than the direct + // metadata file pointer return basePath.toString(); } @@ -159,7 +159,7 @@ public class SavepointStore { * * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file. * @return The loaded savepoint - * @throws Exception Failures during load are forwared + * @throws IOException Failures during load are forwarded */ public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader userClassLoader) throws IOException { Preconditions.checkNotNull(savepointFileOrDirectory, "Path"); @@ -207,7 +207,7 @@ public class SavepointStore { * Removes the savepoint meta data w/o loading and disposing it. * * @param path Path of savepoint to remove - * @throws Exception Failures during disposal are forwarded + * @throws IOException Failures during disposal are forwarded */ public static void removeSavepointFile(String path) throws IOException { Preconditions.checkNotNull(path, "Path"); http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java index 0752897..a42c25d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.api; import static org.apache.flink.util.Preconditions.checkElementIndex; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; import java.io.IOException; @@ -28,6 +29,7 @@ import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType; import org.apache.flink.runtime.event.RuntimeEvent; +import org.apache.flink.util.StringUtils; /** * Checkpoint barriers are used to align checkpoints throughout the streaming topology. The @@ -37,12 +39,12 @@ import org.apache.flink.runtime.event.RuntimeEvent; * * <p>Once an operator has received a checkpoint barrier from all its input channels, it * knows that a certain checkpoint is complete. It can trigger the operator specific checkpoint - * behavior and broadcast the barrier to downstream operators.</p> + * behavior and broadcast the barrier to downstream operators. * * <p>Depending on the semantic guarantees, may hold off post-checkpoint data until the checkpoint - * is complete (exactly once)</p> + * is complete (exactly once). * - * <p>The checkpoint barrier IDs are strictly monotonous increasing.</p> + * <p>The checkpoint barrier IDs are strictly monotonous increasing. */ public class CheckpointBarrier extends RuntimeEvent { @@ -86,8 +88,8 @@ public class CheckpointBarrier extends RuntimeEvent { return; } else if (checkpointType == CheckpointType.SAVEPOINT) { String targetLocation = checkpointOptions.getTargetLocation(); - assert(targetLocation != null); - out.writeUTF(targetLocation); + checkState(targetLocation != null); + StringUtils.writeString(targetLocation, out); } else { throw new IOException("Unknown CheckpointType " + checkpointType); } @@ -99,13 +101,13 @@ public class CheckpointBarrier extends RuntimeEvent { timestamp = in.readLong(); int typeOrdinal = in.readInt(); - checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal " + typeOrdinal); + checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal"); CheckpointType checkpointType = CheckpointType.values()[typeOrdinal]; if (checkpointType == CheckpointType.FULL_CHECKPOINT) { checkpointOptions = CheckpointOptions.forFullCheckpoint(); } else if (checkpointType == CheckpointType.SAVEPOINT) { - String targetLocation = in.readUTF(); + String targetLocation = StringUtils.readString(in); checkpointOptions = CheckpointOptions.forSavepoint(targetLocation); } else { throw new IOException("Illegal CheckpointType " + checkpointType); http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java index 223cbfe..3adf864 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java @@ -44,6 +44,8 @@ import org.apache.flink.util.Preconditions; */ public class EventSerializer { + private static final Charset STRING_CODING_CHARSET = Charset.forName("UTF-8"); + private static final int END_OF_PARTITION_EVENT = 0; private static final int CHECKPOINT_BARRIER_EVENT = 1; @@ -77,16 +79,16 @@ public class EventSerializer { } else if (checkpointType == CheckpointType.SAVEPOINT) { String targetLocation = checkpointOptions.getTargetLocation(); assert(targetLocation != null); - byte[] bytes = targetLocation.getBytes(Charset.forName("UTF-8")); + byte[] locationBytes = targetLocation.getBytes(STRING_CODING_CHARSET); - buf = ByteBuffer.allocate(24 + 4 + bytes.length); + buf = ByteBuffer.allocate(24 + 4 + locationBytes.length); buf.putInt(0, CHECKPOINT_BARRIER_EVENT); buf.putLong(4, barrier.getId()); buf.putLong(12, barrier.getTimestamp()); buf.putInt(20, checkpointType.ordinal()); - buf.putInt(24, bytes.length); - for (int i = 0; i < bytes.length; i++) { - buf.put(28 + i, bytes[i]); + buf.putInt(24, locationBytes.length); + for (int i = 0; i < locationBytes.length; i++) { + buf.put(28 + i, locationBytes[i]); } } else { throw new IOException("Unknown checkpoint type: " + checkpointType); @@ -204,8 +206,7 @@ public class EventSerializer { CheckpointOptions checkpointOptions; int checkpointTypeOrdinal = buffer.getInt(); - Preconditions.checkElementIndex(type, CheckpointType.values().length, - "Illegal CheckpointType ordinal " + checkpointTypeOrdinal); + Preconditions.checkElementIndex(type, CheckpointType.values().length, "Illegal CheckpointType ordinal"); CheckpointType checkpointType = CheckpointType.values()[checkpointTypeOrdinal]; if (checkpointType == CheckpointType.FULL_CHECKPOINT) { @@ -214,7 +215,7 @@ public class EventSerializer { int len = buffer.getInt(); byte[] bytes = new byte[len]; buffer.get(bytes); - String targetLocation = new String(bytes, Charset.forName("UTF-8")); + String targetLocation = new String(bytes, STRING_CODING_CHARSET); checkpointOptions = CheckpointOptions.forSavepoint(targetLocation); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java index 0f99496..4e8871a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java @@ -45,7 +45,11 @@ public class DataInputDeserializer implements DataInputView, java.io.Serializabl // ------------------------------------------------------------------------ public DataInputDeserializer() {} - + + public DataInputDeserializer(byte[] buffer) { + setBuffer(buffer, 0, buffer.length); + } + public DataInputDeserializer(byte[] buffer, int start, int len) { setBuffer(buffer, start, len); } http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java new file mode 100644 index 0000000..dfbde5e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointTypeTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class CheckpointTypeTest { + + /** + * This test validates that the order of enumeration constants is not changed, because the + * ordinal of that enum is used in serialization. + * + * <p>It is still possible to edit both the ordinal and this test, but the test adds + * a level of safety, and should make developers stumble over this when attempting + * to adjust the enumeration. + */ + @Test + public void testOrdinalsAreConstant() { + assertEquals(0, CheckpointType.FULL_CHECKPOINT.ordinal()); + assertEquals(1, CheckpointType.SAVEPOINT.ordinal()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df16e50b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java index dd5b0b6..ad9fc16 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/CheckpointBarrierTest.java @@ -49,7 +49,7 @@ public class CheckpointBarrierTest { DataOutputSerializer out = new DataOutputSerializer(1024); barrier.write(out); - DataInputDeserializer in = new DataInputDeserializer(out.wrapAsByteBuffer()); + DataInputDeserializer in = new DataInputDeserializer(out.getCopyOfBuffer()); CheckpointBarrier deserialized = new CheckpointBarrier(); deserialized.read(in);
