Repository: reef Updated Branches: refs/heads/master b7a98d795 -> 4392ab83b
[REEF-1659] Refactor checkpointing service for REEF Java. This patch refactors the 'reef-checkpoint' package for readability. JIRA: [REEF-1659](https://issues.apache.org/jira/browse/REEF-1659) Pull Request: This closes #1178 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/4392ab83 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/4392ab83 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/4392ab83 Branch: refs/heads/master Commit: 4392ab83badf077133bdaeabaaa7168f6588852b Parents: b7a98d7 Author: Sergey Dudoladov <[email protected]> Authored: Fri Nov 4 21:55:09 2016 +0100 Committer: Markus Weimer <[email protected]> Committed: Thu Nov 10 16:22:19 2016 -0800 ---------------------------------------------------------------------- .../apache/reef/io/checkpoint/CheckpointID.java | 7 ++- .../reef/io/checkpoint/CheckpointService.java | 34 ++++++++------ .../reef/io/checkpoint/RandomNameCNS.java | 24 ++++++++-- .../reef/io/checkpoint/SimpleNamingService.java | 5 +- .../fs/FSCheckPointServiceConfiguration.java | 6 ++- .../reef/io/checkpoint/fs/FSCheckpointID.java | 9 ++-- .../io/checkpoint/fs/FSCheckpointService.java | 49 ++++++++++++++------ .../reef/io/checkpoint/fs/package-info.java | 2 +- 8 files changed, 93 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java index b286878..927f714 100644 --- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java +++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointID.java @@ -21,10 +21,9 @@ package org.apache.reef.io.checkpoint; import org.apache.hadoop.io.Writable; /** - * This class represent the identified (memento) for a checkpoint. It is allowed - * to contain small amount of metadata about a checkpoint and must provide sufficient - * information to the corresponding CheckpointService to locate and retrieve the - * data contained in the checkpoint. + * This class represent an identifier of a checkpoint. + * The identifier must provide sufficient information to the corresponding CheckpointService to locate and retrieve the + * data contained in the checkpoint. The identifier can also contain small amount of metadata about a checkpoint. */ public interface CheckpointID extends Writable { http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java index d776665..99f5bf4 100644 --- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java +++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/CheckpointService.java @@ -25,11 +25,14 @@ import java.nio.channels.WritableByteChannel; /** * The CheckpointService provides a simple API to store and retrieve the state of a task. * <p> - * Checkpoints are atomic, single-writer, write-once, multiple-readers, ready-many type of objects. - * This is provided by releasing the CheckpointID for a checkpoint only upon commit of the checkpoint, - * and by preventing a checkpoint to be re-opened for writes. + * Checkpoints are assumed to be atomic, single-writer, write-once, multiple-readers, ready-many type of objects. + * + * To ensure this, any implementation of this interface must. + * 1) Return a CheckpointID to a client only + * upon the successful {@link #commit(CheckpointWriteChannel) commit} of the checkpoint. + * 2) Prevent any checkpoint from being re-opened for writes. * <p> - * Non-functional properties such as durability, availability, compression, garbage collection, + * Non-functional properties such as durability, availability, compression, garbage collection, and * quotas are left to the implementation. * <p> * This API is envisioned as the basic building block for a checkpoint service, on top of which richer @@ -39,10 +42,10 @@ import java.nio.channels.WritableByteChannel; public interface CheckpointService { /** - * This method creates a checkpoint and provide a channel to write to it. + * Creates a checkpoint and provides a channel to write to it. * The name/location of the checkpoint are unknown to the user as of this time, in fact, - * the CheckpointID is not released to the user until commit is called. This makes enforcing - * atomicity of writes easy. + * the CheckpointID is not released to the user until {@link #commit(CheckpointWriteChannel) commit} is called. + * This makes enforcing atomicity of writes easy. * * @return a CheckpointWriteChannel that can be used to write to the checkpoint * @throws IOException @@ -51,8 +54,11 @@ public interface CheckpointService { CheckpointWriteChannel create() throws IOException, InterruptedException; /** - * Used to finalize and existing checkpoint. It returns the CheckpointID that can be later - * used to access (read-only) this checkpoint. This guarantees atomicity of the checkpoint. + * Closes an existing checkpoint for writes and returns the CheckpointID that can be later + * used to get the read-only access to this checkpoint. + * + * Implementation is supposed to return the CheckpointID to the caller only on the + * successful completion of checkpoint to guarantee atomicity of the checkpoint. * * @param channel the CheckpointWriteChannel to commit * @return a CheckpointID @@ -62,9 +68,9 @@ public interface CheckpointService { CheckpointID commit(CheckpointWriteChannel channel) throws IOException, InterruptedException; /** - * Dual to commit, it aborts the current checkpoint. Garbage collection choices are - * left to the implementation. The CheckpointID is not generated nor released to the - * user so the checkpoint is not accessible. + * Aborts the current checkpoint. Garbage collection choices are + * left to the implementation. The CheckpointID is neither generated nor released to the + * client so the checkpoint is not accessible. * * @param channel the CheckpointWriteChannel to abort * @throws IOException @@ -73,7 +79,7 @@ public interface CheckpointService { void abort(CheckpointWriteChannel channel) throws IOException, InterruptedException; /** - * Given a CheckpointID returns a reading channel. + * Returns a reading channel to a checkpoint identified by the CheckpointID. * * @param checkpointId CheckpointID for the checkpoint to be opened * @return a CheckpointReadChannel @@ -83,7 +89,7 @@ public interface CheckpointService { CheckpointReadChannel open(CheckpointID checkpointId) throws IOException, InterruptedException; /** - * It discards an existing checkpoint identified by its CheckpointID. + * Discards an existing checkpoint identified by its CheckpointID. * * @param checkpointId CheckpointID for the checkpoint to be deleted * @return a boolean confirming success of the deletion http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java index 7de31cd..50e44a7 100644 --- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java +++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/RandomNameCNS.java @@ -26,20 +26,31 @@ import org.apache.reef.tang.annotations.Parameter; import javax.inject.Inject; /** - * Simple naming service that generates a random checkpoint name. + * A naming service that generates a random checkpoint name by appending a random alphanumeric string (suffix) + * of a given length to a user-supplied prefix string. */ public class RandomNameCNS implements CheckpointNamingService { private final String prefix; + private final int lengthOfRandomSuffix; - @Inject + @Deprecated public RandomNameCNS(@Parameter(PREFIX.class) final String prefix) { this.prefix = prefix; + this.lengthOfRandomSuffix + = Integer.parseInt(LengthOfRandomSuffix.class.getAnnotation(NamedParameter.class).default_value()); + } + + @Inject + private RandomNameCNS(@Parameter(PREFIX.class) final String prefix, + @Parameter(LengthOfRandomSuffix.class) final int lengthOfRandomSuffix) { + this.prefix = prefix; + this.lengthOfRandomSuffix = lengthOfRandomSuffix; } @Override public String getNewName() { - return this.prefix + RandomStringUtils.randomAlphanumeric(8); + return this.prefix + RandomStringUtils.randomAlphanumeric(lengthOfRandomSuffix); } /** @@ -49,4 +60,11 @@ public class RandomNameCNS implements CheckpointNamingService { public static class PREFIX implements Name<String> { } + /** + * Number of alphanumeric characters in the random part of a checkpoint name. + */ + @NamedParameter(doc = "Number of alphanumeric chars in the random part of a checkpoint name.", default_value = "8") + public static class LengthOfRandomSuffix implements Name<Integer> { + } + } http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java index 7b43380..823ff1e 100644 --- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java +++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/SimpleNamingService.java @@ -26,6 +26,7 @@ import javax.inject.Inject; /** * A naming service that simply returns the name it has been initialized with. + * Note that the name is always the same. */ public class SimpleNamingService implements CheckpointNamingService { @@ -37,7 +38,7 @@ public class SimpleNamingService implements CheckpointNamingService { } /** - * Generate a new checkpoint Name. + * Generate a new checkpoint name. * * @return the checkpoint name */ @@ -49,7 +50,7 @@ public class SimpleNamingService implements CheckpointNamingService { /** * Prefix for checkpoints. */ - @NamedParameter(doc = "Checkpoint prefix.", short_name = "checkpoint_prefix", default_value = "reef") + @NamedParameter(doc = "Checkpoint name.", short_name = "checkpoint_name", default_value = "reef") public static final class CheckpointName implements Name<String> { } } http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java index b48a8b9..c8464a6 100644 --- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java +++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckPointServiceConfiguration.java @@ -39,7 +39,7 @@ import javax.inject.Inject; import java.io.IOException; /** - * ConfigurationModule for the FSCheckPointService. + * ConfigurationModule for the FSCheckpointService. * This can be used to create Evaluator-side configurations of the checkpointing service. */ @DriverSide @@ -65,11 +65,15 @@ public class FSCheckPointServiceConfiguration extends ConfigurationModuleBuilder * Prefix for checkpoint files (optional). */ public static final OptionalParameter<String> PREFIX = new OptionalParameter<>(); + + public static final ConfigurationModule CONF = new FSCheckPointServiceConfiguration() + .bindImplementation(CheckpointService.class, FSCheckpointService.class) // Use the HDFS based checkpoints .bindImplementation(CheckpointNamingService.class, RandomNameCNS.class) // Use Random Names for the checkpoints .bindImplementation(CheckpointID.class, FSCheckpointID.class) .bindConstructor(FileSystem.class, FileSystemConstructor.class) + .bindNamedParameter(FileSystemConstructor.IsLocal.class, IS_LOCAL) .bindNamedParameter(FSCheckpointService.PATH.class, PATH) .bindNamedParameter(FSCheckpointService.ReplicationFactor.class, REPLICATION_FACTOR) http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java index f43cc67..1b97feb 100644 --- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java +++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointID.java @@ -34,8 +34,9 @@ public class FSCheckpointID implements CheckpointID { private Path path; - public FSCheckpointID() { - } + // CheckpointID extends Hadoop Writable interface that enables serialization + // Java serialization requires a (potentially empty) public default constructor + public FSCheckpointID(){} public FSCheckpointID(final Path path) { this.path = path; @@ -62,8 +63,8 @@ public class FSCheckpointID implements CheckpointID { @Override public boolean equals(final Object other) { - return other instanceof FSCheckpointID - && path.equals(((FSCheckpointID) other).path); + return (other instanceof FSCheckpointID) + && path.equals(((FSCheckpointID) other).path); } @Override http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java index b7f6ec3..fcc65a2 100644 --- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java +++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/FSCheckpointService.java @@ -39,10 +39,12 @@ import java.nio.channels.WritableByteChannel; /** * A FileSystem based CheckpointService. + * + * Note that this implementation creates a temporary file first and moves it to final destination at commit time. */ public class FSCheckpointService implements CheckpointService { - private final Path base; + private final Path basePath; private final FileSystem fs; private final CheckpointNamingService namingPolicy; private final short replication; @@ -53,7 +55,7 @@ public class FSCheckpointService implements CheckpointService { final CheckpointNamingService namingPolicy, @Parameter(ReplicationFactor.class) final short replication) { this.fs = fs; - this.base = new Path(basePath); + this.basePath = new Path(basePath); this.namingPolicy = namingPolicy; this.replication = replication; } @@ -63,7 +65,7 @@ public class FSCheckpointService implements CheckpointService { final CheckpointNamingService namingPolicy, final short replication) { this.fs = fs; - this.base = base; + this.basePath = base; this.namingPolicy = namingPolicy; this.replication = replication; } @@ -76,77 +78,95 @@ public class FSCheckpointService implements CheckpointService { throws IOException { final String name = namingPolicy.getNewName(); - final Path p = new Path(name); if (p.isUriPathAbsolute()) { - throw new IOException("Checkpoint cannot be an absolute path"); + throw new IOException("Checkpoint name cannot be an absolute path."); } - return createInternal(new Path(base, p)); + + return createInternal(new Path(basePath, p)); } + CheckpointWriteChannel createInternal(final Path name) throws IOException { - //create a temp file, fail if file exists + /* Create a temp file, fail if file exists. + The likely reason to do so (I am not the original author) is to check that the file is indeed writable. + Checking this directly via a file system call may lead to a time-of-check/time-of-use race condition. + See the pull request for REEF-1659 for discussion. + */ return new FSCheckpointWriteChannel(name, fs.create(tmpfile(name), replication)); } @Override public CheckpointReadChannel open(final CheckpointID id) throws IOException, InterruptedException { + if (!(id instanceof FSCheckpointID)) { throw new IllegalArgumentException( - "Mismatched checkpoint type: " + id.getClass()); + "Mismatched checkpoint id type. Expected FSCheckpointID, but actually got " + id.getClass()); } + return new FSCheckpointReadChannel( fs.open(((FSCheckpointID) id).getPath())); } @Override - public CheckpointID commit(final CheckpointWriteChannel ch) throws IOException, - InterruptedException { + public CheckpointID commit(final CheckpointWriteChannel ch) + throws IOException, InterruptedException { + if (ch.isOpen()) { ch.close(); } + final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch; final Path dst = hch.getDestination(); + if (!fs.rename(tmpfile(dst), dst)) { // attempt to clean up abort(ch); throw new IOException("Failed to promote checkpoint" + tmpfile(dst) + " -> " + dst); } + return new FSCheckpointID(hch.getDestination()); } @Override public void abort(final CheckpointWriteChannel ch) throws IOException { + if (ch.isOpen()) { ch.close(); } + final FSCheckpointWriteChannel hch = (FSCheckpointWriteChannel) ch; final Path tmp = tmpfile(hch.getDestination()); + try { if (!fs.delete(tmp, false)) { - throw new IOException("Failed to delete checkpoint during abort"); + throw new IOException("Failed to delete a temporary checkpoint file during abort. Path: " + tmp); } } catch (final FileNotFoundException ignored) { // IGNORE } + } @Override - public boolean delete(final CheckpointID id) throws IOException, - InterruptedException { + public boolean delete(final CheckpointID id) + throws IOException, InterruptedException { + if (!(id instanceof FSCheckpointID)) { throw new IllegalArgumentException( - "Mismatched checkpoint type: " + id.getClass()); + "Mismatched checkpoint id type. Expected FSCheckpointID, but actually got " + id.getClass()); } + final Path tmp = ((FSCheckpointID) id).getPath(); try { return fs.delete(tmp, false); } catch (final FileNotFoundException ignored) { // IGNORE } + return true; } @@ -160,6 +180,7 @@ public class FSCheckpointService implements CheckpointService { private static class FSCheckpointWriteChannel implements CheckpointWriteChannel { + private final Path finalDst; private final WritableByteChannel out; private boolean isOpen = true; http://git-wip-us.apache.org/repos/asf/reef/blob/4392ab83/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java index c63e054..8eb797c 100644 --- a/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java +++ b/lang/java/reef-checkpoint/src/main/java/org/apache/reef/io/checkpoint/fs/package-info.java @@ -17,6 +17,6 @@ * under the License. */ /** - * FileSystem based checkpoints. + * Provides an example implementation of a CheckpointService based on a file system. */ package org.apache.reef.io.checkpoint.fs;
