Repository: reef
Updated Branches:
  refs/heads/master 6cde8ce78 -> 16ca43efe


[REEF-1412] Use a two-file approach for DFSEvaluatorLogOverwriteWriter

This addressed the issue by
  * Modifying DFSEvaluatorLogAppendWriter to DFSEvaluatorLogAppendReaderWriter
    such that it is aware of both read and write file paths.
  * Adding helper DFSLineReader to assist in reading DFS files.
  * Increasing number of Evaluator requests in HelloRestartDriver to test the
    functionality of writing longer files.

JIRA:
  [REEF-1412](https://issues.apache.org/jira/browse/REEF-1412)

Pull Request:
  This closes #1023


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/16ca43ef
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/16ca43ef
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/16ca43ef

Branch: refs/heads/master
Commit: 16ca43efe49269e7b01b3b5783e26b09ba1adc19
Parents: 6cde8ce
Author: Andrew Chung <[email protected]>
Authored: Thu Jun 2 11:51:55 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Tue Jun 21 12:43:14 2016 -0700

----------------------------------------------------------------------
 .../DriverRestart/HelloRestartDriver.cs         |   4 +-
 .../DFSEvaluatorLogAppendReaderWriter.java      |  86 +++++++++
 .../restart/DFSEvaluatorLogAppendWriter.java    |  79 --------
 .../DFSEvaluatorLogOverwriteReaderWriter.java   | 192 +++++++++++++++++++
 .../restart/DFSEvaluatorLogOverwriteWriter.java | 104 ----------
 .../restart/DFSEvaluatorLogReaderWriter.java    |  46 +++++
 .../driver/restart/DFSEvaluatorLogWriter.java   |  39 ----
 .../driver/restart/DFSEvaluatorPreserver.java   |  35 ++--
 .../yarn/driver/restart/DFSLineReader.java      | 136 +++++++++++++
 .../org/apache/reef/util/CloseableIterable.java |  25 +++
 10 files changed, 500 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs 
b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
index 8e5c706..d02a614 100644
--- a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
@@ -47,8 +47,8 @@ namespace Org.Apache.REEF.Examples.DriverRestart
         IObserver<IFailedEvaluator>
     {
         private static readonly Logger Logger = 
Logger.GetLogger(typeof(HelloRestartDriver));
-        private const int NumberOfTasksToSubmit = 1;
-        private const int NumberOfTasksToSubmitOnRestart = 1;
+        private const int NumberOfTasksToSubmit = 3;
+        private const int NumberOfTasksToSubmitOnRestart = 3;
 
         private readonly IEvaluatorRequestor _evaluatorRequestor;
 

http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendReaderWriter.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendReaderWriter.java
new file mode 100644
index 0000000..d5023e8
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendReaderWriter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.CloseableIterable;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * The DFS evaluator logger that performs regular append. dfs.support.append 
should be true.
+ */
+@Private
+public final class DFSEvaluatorLogAppendReaderWriter implements 
DFSEvaluatorLogReaderWriter {
+
+  private final FileSystem fileSystem;
+  private final Path changelogPath;
+  private final DFSLineReader reader;
+
+  private boolean fsClosed = false;
+
+  DFSEvaluatorLogAppendReaderWriter(final FileSystem fileSystem, final Path 
changelogPath) {
+    this.fileSystem = fileSystem;
+    this.changelogPath = changelogPath;
+    this.reader = new DFSLineReader(fileSystem);
+  }
+
+  /**
+   * Writes a formatted entry (addition or removal) for an Evaluator ID into 
the DFS evaluator log.
+   * The entry is appended regularly by an FS that supports append.
+   * @param formattedEntry The formatted entry (entry with evaluator ID and 
addition/removal information).
+   * @throws IOException
+   */
+  @Override
+  public synchronized void writeToEvaluatorLog(final String formattedEntry) 
throws IOException {
+    final boolean fileCreated = this.fileSystem.exists(this.changelogPath);
+
+    try (
+        final BufferedWriter bw = fileCreated ?
+            new BufferedWriter(new OutputStreamWriter(
+                this.fileSystem.append(this.changelogPath), 
StandardCharsets.UTF_8)) :
+            new BufferedWriter(new OutputStreamWriter(
+                this.fileSystem.create(this.changelogPath), 
StandardCharsets.UTF_8))
+    ) {
+      bw.write(formattedEntry);
+    }
+  }
+
+  @Override
+  public CloseableIterable<String> readFromEvaluatorLog() throws IOException {
+    return reader.readLinesFromFile(changelogPath);
+  }
+
+  /**
+   * Closes the FileSystem.
+   * @throws Exception
+   */
+  @Override
+  public synchronized void close() throws Exception {
+    if (this.fileSystem != null && !this.fsClosed) {
+      this.fileSystem.close();
+      this.fsClosed = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
deleted file mode 100644
index 7d07939..0000000
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.runtime.yarn.driver.restart;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.reef.annotations.audience.Private;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
-
-/**
- * The DFS evaluator logger that performs regular append. dfs.support.append 
should be true.
- */
-@Private
-public final class DFSEvaluatorLogAppendWriter implements 
DFSEvaluatorLogWriter {
-
-  private final FileSystem fileSystem;
-
-  private final Path changelogPath;
-
-  private boolean fsClosed = false;
-
-  DFSEvaluatorLogAppendWriter(final FileSystem fileSystem, final Path 
changelogPath) {
-    this.fileSystem = fileSystem;
-    this.changelogPath = changelogPath;
-  }
-
-  /**
-   * Writes a formatted entry (addition or removal) for an Evaluator ID into 
the DFS evaluator log.
-   * The entry is appended regularly by an FS that supports append.
-   * @param formattedEntry The formatted entry (entry with evaluator ID and 
addition/removal information).
-   * @throws IOException
-   */
-  @Override
-  public synchronized void writeToEvaluatorLog(final String formattedEntry) 
throws IOException {
-    final boolean fileCreated = this.fileSystem.exists(this.changelogPath);
-
-    try (
-        final BufferedWriter bw = fileCreated ?
-            new BufferedWriter(new OutputStreamWriter(
-                this.fileSystem.append(this.changelogPath), 
StandardCharsets.UTF_8)) :
-            new BufferedWriter(new OutputStreamWriter(
-                this.fileSystem.create(this.changelogPath), 
StandardCharsets.UTF_8))
-    ) {
-      bw.write(formattedEntry);
-    }
-  }
-
-  /**
-   * Closes the FileSystem.
-   * @throws Exception
-   */
-  @Override
-  public synchronized void close() throws Exception {
-    if (this.fileSystem != null && !this.fsClosed) {
-      this.fileSystem.close();
-      this.fsClosed = true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteReaderWriter.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteReaderWriter.java
new file mode 100644
index 0000000..3518fb5
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteReaderWriter.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.CloseableIterable;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * The DFS evaluator logger that does not support append and does append by 
overwrite.
+ * dfs.support.append should be false.
+ */
+@Private
+public final class DFSEvaluatorLogOverwriteReaderWriter implements 
DFSEvaluatorLogReaderWriter {
+
+  private final FileSystem fileSystem;
+
+  private final DFSLineReader reader;
+  private final Path changeLogPath;
+  private final Path changeLogAltPath;
+
+  // This is the last path we will be writing to.
+  private Path pathToWriteTo = null;
+
+  private boolean fsClosed = false;
+
+  DFSEvaluatorLogOverwriteReaderWriter(final FileSystem fileSystem, final Path 
changeLogPath) {
+    this.fileSystem = fileSystem;
+    this.changeLogPath = changeLogPath;
+    this.changeLogAltPath = new Path(changeLogPath + ".alt");
+    this.reader = new DFSLineReader(fileSystem);
+  }
+
+  /**
+   * Writes a formatted entry (addition or removal) for an Evaluator ID into 
the DFS evaluator log.
+   * The log is appended to by reading first, adding on the information, and 
then overwriting the entire log.
+   * Since the {@link FileSystem} does not support appends, this {@link 
DFSEvaluatorLogReaderWriter}
+   * uses a two-file approach, where when we write, we always overwrite the 
older file.
+   * @param formattedEntry The formatted entry (entry with evaluator ID and 
addition/removal information).
+   * @throws IOException when file cannot be written.
+   */
+  @Override
+  public synchronized void writeToEvaluatorLog(final String formattedEntry) 
throws IOException {
+    final Path writePath = getWritePath();
+
+    // readPath is always not the writePath.
+    final Path readPath = getAlternativePath(writePath);
+
+    try (final FSDataOutputStream outputStream = 
this.fileSystem.create(writePath, true)) {
+      InputStream inputStream = null;
+      try {
+        final InputStream newEntryInputStream = new ByteArrayInputStream(
+            formattedEntry.getBytes(StandardCharsets.UTF_8));
+
+        if (fileSystem.exists(readPath)) {
+          inputStream = new SequenceInputStream(
+              this.fileSystem.open(readPath), newEntryInputStream);
+        } else {
+          inputStream = newEntryInputStream;
+        }
+
+        IOUtils.copyBytes(inputStream, outputStream, 4096, true);
+      } finally {
+        outputStream.hsync();
+        if (inputStream != null) {
+          inputStream.close();
+        }
+      }
+    }
+  }
+
+  /**
+   * Since the {@link FileSystem} does not support appends, this {@link 
DFSEvaluatorLogReaderWriter}
+   * uses a two-file approach, where when we read, we always read from the 
newer file.
+   */
+  @Override
+  public synchronized CloseableIterable<String> readFromEvaluatorLog() throws 
IOException {
+    return reader.readLinesFromFile(getLongerFile());
+  }
+
+  /**
+   * Gets the alternative path. Returns one of changeLogPath and 
changeLogAltPath.
+   */
+  private synchronized Path getAlternativePath(final Path path) {
+    if (path.equals(changeLogPath)) {
+      return changeLogAltPath;
+    }
+
+    return changeLogPath;
+  }
+
+  /**
+   * Gets the path to write to.
+   */
+  private synchronized Path getWritePath() throws IOException {
+    if (pathToWriteTo == null) {
+      // If we have not yet written before, check existence of files.
+      final boolean originalExists = fileSystem.exists(changeLogPath);
+      final boolean altExists = fileSystem.exists(changeLogAltPath);
+
+      if (originalExists && altExists) {
+        final FileStatus originalStatus = 
fileSystem.getFileStatus(changeLogPath);
+        final FileStatus altStatus = 
fileSystem.getFileStatus(changeLogAltPath);
+
+        // Return the shorter file.
+        // TODO[JIRA REEF-1413]: This approach will not be able to work in 
REEF-1413.
+        // TODO[JIRA REEF-1413]: Note that we cannot use last modified time 
because Azure blob's HDFS API only
+        // TODO[JIRA REEF-1413]: supports time resolution up to a second.
+        final long originalLen = originalStatus.getLen();
+        final long altLen = altStatus.getLen();
+
+        if (originalLen < altLen) {
+          pathToWriteTo = changeLogPath;
+        } else {
+          pathToWriteTo = changeLogAltPath;
+        }
+      } else if (originalExists) {
+        // Return the file that does not exist.
+        pathToWriteTo = changeLogAltPath;
+      } else {
+        pathToWriteTo = changeLogPath;
+      }
+    }
+
+    final Path returnPath = pathToWriteTo;
+    pathToWriteTo = getAlternativePath(pathToWriteTo);
+
+    return returnPath;
+  }
+
+  private synchronized Path getLongerFile() throws IOException {
+    final boolean originalExists = fileSystem.exists(changeLogPath);
+    final boolean altExists = fileSystem.exists(changeLogAltPath);
+
+    // If both files exist, return the newest file path.
+    if (originalExists && altExists) {
+      final FileStatus originalStatus = 
fileSystem.getFileStatus(changeLogPath);
+      final FileStatus altStatus = fileSystem.getFileStatus(changeLogAltPath);
+
+      final long originalLastModTime = originalStatus.getLen();
+      final long altLastModTime = altStatus.getLen();
+
+      // Return the newer file.
+      if (originalLastModTime >= altLastModTime) {
+        return changeLogPath;
+      }
+
+      return changeLogAltPath;
+    } else if (altExists) {
+      // If only the alt file exists, return the alt file path.
+      return changeLogAltPath;
+    }
+
+    // If only the original file exists or if neither exist, return the 
original file path.
+    return changeLogPath;
+  }
+
+  /**
+   * Closes the FileSystem.
+   * @throws Exception
+   */
+  @Override
+  public synchronized void close() throws Exception {
+    if (this.fileSystem != null && !this.fsClosed) {
+      this.fileSystem.close();
+      this.fsClosed = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
deleted file mode 100644
index 5a23231..0000000
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.runtime.yarn.driver.restart;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.reef.annotations.audience.Private;
-
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-
-/**
- * The DFS evaluator logger that does not support append and does append by 
overwrite.
- * dfs.support.append should be false.
- */
-@Private
-public final class DFSEvaluatorLogOverwriteWriter implements 
DFSEvaluatorLogWriter {
-
-  private final FileSystem fileSystem;
-
-  private final Path changelogPath;
-
-  private boolean fsClosed = false;
-
-  DFSEvaluatorLogOverwriteWriter(final FileSystem fileSystem, final Path 
changelogPath) {
-    this.fileSystem = fileSystem;
-    this.changelogPath = changelogPath;
-  }
-
-  /**
-   * Writes a formatted entry (addition or removal) for an Evaluator ID into 
the DFS evaluator log.
-   * The log is appended to by reading first, adding on the information, and 
then overwriting the entire
-   * log.
-   * @param formattedEntry The formatted entry (entry with evaluator ID and 
addition/removal information).
-   * @throws IOException when file cannot be written.
-   */
-  @Override
-  public synchronized void writeToEvaluatorLog(final String formattedEntry) 
throws IOException {
-    final boolean fileCreated = this.fileSystem.exists(this.changelogPath);
-
-    if (!fileCreated) {
-      try (final BufferedWriter bw = new BufferedWriter(
-              new 
OutputStreamWriter(this.fileSystem.create(this.changelogPath), 
StandardCharsets.UTF_8))) {
-        bw.write(formattedEntry);
-      }
-    } else {
-      this.appendByDeleteAndCreate(formattedEntry);
-    }
-  }
-
-  /**
-   * For certain HDFS implementation, the append operation may not be 
supported (e.g., Azure blob - wasb)
-   * in this case, we will emulate the append operation by reading the 
content, appending entry at the end,
-   * then recreating the file with appended content.
-   *
-   * @throws java.io.IOException when the file can't be written.
-   */
-  private void appendByDeleteAndCreate(final String appendEntry)
-      throws IOException {
-    final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-
-    try (final InputStream inputStream = 
this.fileSystem.open(this.changelogPath)) {
-      IOUtils.copyBytes(inputStream, outputStream, 4096, true);
-    }
-
-    final String newContent = outputStream.toString("UTF-8") + appendEntry;
-    this.fileSystem.delete(this.changelogPath, true);
-
-    try (final FSDataOutputStream newOutput = 
this.fileSystem.create(this.changelogPath);
-         final InputStream newInput = new 
ByteArrayInputStream(newContent.getBytes(StandardCharsets.UTF_8))) {
-      IOUtils.copyBytes(newInput, newOutput, 4096, true);
-    }
-  }
-
-  /**
-   * Closes the FileSystem.
-   * @throws Exception
-   */
-  @Override
-  public synchronized void close() throws Exception {
-    if (this.fileSystem != null && !this.fsClosed) {
-      this.fileSystem.close();
-      this.fsClosed = true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogReaderWriter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogReaderWriter.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogReaderWriter.java
new file mode 100644
index 0000000..2620e94
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogReaderWriter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.CloseableIterable;
+
+import java.io.IOException;
+
+/**
+ * The Evaluator log reader/writer that reads from and writes to the Hadoop 
DFS.
+ * Currently supports regular append and append by overwrite.
+ */
+@Private
+public interface DFSEvaluatorLogReaderWriter extends AutoCloseable {
+
+  /**
+   * Writes a formatted entry (addition or removal) for an Evaluator ID into 
the DFS evaluator log.
+   * @param formattedEntry The formatted entry (entry with evaluator ID and 
addition/removal information)
+   * @throws IOException
+   */
+  void writeToEvaluatorLog(final String formattedEntry) throws IOException;
+
+  /**
+   * Reads a formatted entry (addition or removal) from the DFS evaluator log.
+   * @return the formatted entry.
+   * @throws IOException
+   */
+  CloseableIterable<String> readFromEvaluatorLog() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
deleted file mode 100644
index b5e3881..0000000
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.runtime.yarn.driver.restart;
-
-import org.apache.reef.annotations.audience.Private;
-
-import java.io.IOException;
-
-/**
- * The Evaluator log writer that writes to DFS. Currently supports regular 
append and append by overwrite.
- * Actual log entries should be immutable and no entry should ever be deleted. 
To remove an evaluator, a
- * removal entry should be preferred.
- */
-@Private
-public interface DFSEvaluatorLogWriter extends AutoCloseable {
-
-  /**
-   * Writes a formatted entry (addition or removal) for an Evaluator ID into 
the DFS evaluator log.
-   * @param formattedEntry The formatted entry (entry with evaluator ID and 
addition/removal information)
-   * @throws IOException
-   */
-  void writeToEvaluatorLog(final String formattedEntry) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
index 401b672..7be59e8 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
@@ -32,10 +32,10 @@ import 
org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
 import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
 import org.apache.reef.runtime.yarn.util.YarnUtilities;
 import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.CloseableIterable;
 
 import javax.inject.Inject;
 import java.io.*;
-import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
@@ -57,7 +57,7 @@ public final class DFSEvaluatorPreserver implements 
EvaluatorPreserver, AutoClos
 
   private final boolean failDriverOnEvaluatorLogErrors;
 
-  private DFSEvaluatorLogWriter writer;
+  private DFSEvaluatorLogReaderWriter readerWriter;
 
   private Path changeLogLocation;
 
@@ -88,9 +88,9 @@ public final class DFSEvaluatorPreserver implements 
EvaluatorPreserver, AutoClos
       boolean appendSupported = config.getBoolean("dfs.support.append", false);
 
       if (appendSupported) {
-        this.writer = new DFSEvaluatorLogAppendWriter(this.fileSystem, 
this.changeLogLocation);
+        this.readerWriter = new 
DFSEvaluatorLogAppendReaderWriter(this.fileSystem, this.changeLogLocation);
       } else {
-        this.writer = new DFSEvaluatorLogOverwriteWriter(this.fileSystem, 
this.changeLogLocation);
+        this.readerWriter = new 
DFSEvaluatorLogOverwriteReaderWriter(this.fileSystem, this.changeLogLocation);
       }
     } catch (final IOException e) {
       final String errMsg = "Cannot read from log file with Exception " + e +
@@ -100,7 +100,7 @@ public final class DFSEvaluatorPreserver implements 
EvaluatorPreserver, AutoClos
       this.handleException(e, errMsg, fatalMsg);
       this.fileSystem = null;
       this.changeLogLocation = null;
-      this.writer = null;
+      this.readerWriter = null;
     }
   }
 
@@ -123,7 +123,6 @@ public final class DFSEvaluatorPreserver implements 
EvaluatorPreserver, AutoClos
 
   /**
    * Recovers the set of evaluators that are alive.
-   * @return
    */
   @Override
   public synchronized Set<String> recoverEvaluators() {
@@ -135,14 +134,8 @@ public final class DFSEvaluatorPreserver implements 
EvaluatorPreserver, AutoClos
         return expectedContainers;
       }
 
-      if (!this.fileSystem.exists(this.changeLogLocation)) {
-        // empty set
-        return expectedContainers;
-      } else {
-        final BufferedReader br = new BufferedReader(
-            new 
InputStreamReader(this.fileSystem.open(this.changeLogLocation), 
StandardCharsets.UTF_8));
-        String line = br.readLine();
-        while (line != null) {
+      try (final CloseableIterable<String> evaluatorLogIterable = 
readerWriter.readFromEvaluatorLog()) {
+        for (final String line : evaluatorLogIterable) {
           if (line.startsWith(ADD_FLAG)) {
             final String containerId = line.substring(ADD_FLAG.length());
             if (expectedContainers.contains(containerId)) {
@@ -159,18 +152,16 @@ public final class DFSEvaluatorPreserver implements 
EvaluatorPreserver, AutoClos
             }
             expectedContainers.remove(containerId);
           }
-          line = br.readLine();
         }
-        br.close();
       }
-    } catch (final IOException e) {
+    } catch (final Exception e) {
       final String errMsg = "Cannot read from log file with Exception " + e +
           ", evaluators will not be recovered.";
 
       final String fatalMsg = "Cannot read from evaluator log.";
-
       this.handleException(e, errMsg, fatalMsg);
     }
+
     return expectedContainers;
   }
 
@@ -200,7 +191,7 @@ public final class DFSEvaluatorPreserver implements 
EvaluatorPreserver, AutoClos
 
   private void logContainerChange(final String entry) {
     try {
-      this.writer.writeToEvaluatorLog(entry);
+      this.readerWriter.writeToEvaluatorLog(entry);
     } catch (final IOException e) {
       final String errorMsg = "Unable to log the change of container [" + 
entry +
           "] to the container log. Driver restart won't work properly.";
@@ -232,13 +223,13 @@ public final class DFSEvaluatorPreserver implements 
EvaluatorPreserver, AutoClos
   }
 
   /**
-   * Closes the writer, which in turn closes the FileSystem.
+   * Closes the readerWriter, which in turn closes the FileSystem.
    * @throws Exception
    */
   @Override
   public synchronized void close() throws Exception {
-    if (this.writer != null && !this.writerClosed) {
-      this.writer.close();
+    if (this.readerWriter != null && !this.writerClosed) {
+      this.readerWriter.close();
       this.writerClosed = true;
     }
   }

http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSLineReader.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSLineReader.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSLineReader.java
new file mode 100644
index 0000000..d707b12
--- /dev/null
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSLineReader.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.reef.util.CloseableIterable;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * A reads lines from a {@link Path} on a {@link FileSystem}.
+ * Assumes the file is encoded in {@link StandardCharsets#UTF_8}.
+ */
+final class DFSLineReader {
+
+  private final FileSystem fileSystem;
+
+  DFSLineReader(final FileSystem fileSystem) {
+    this.fileSystem = fileSystem;
+  }
+
+  /**
+   * Reads lines from the specified path.
+   */
+  CloseableIterable<String> readLinesFromFile(final Path path) {
+    return new DFSLineReaderIterable(fileSystem, path);
+  }
+
+  /**
+   * Iterable of DFS file lines.
+   */
+  private final class DFSLineReaderIterable implements 
CloseableIterable<String> {
+
+    private final DFSLineReaderIterator iterator;
+
+    private DFSLineReaderIterable(final FileSystem fileSystem, final Path 
path) {
+      this.iterator = new DFSLineReaderIterator(fileSystem, path);
+    }
+
+    @Override
+    public Iterator<String> iterator() {
+      return iterator;
+    }
+
+    @Override
+    public void close() throws Exception {
+      iterator.close();
+    }
+  }
+
+  /**
+   * Iterator of DFS file lines.
+   */
+  private final class DFSLineReaderIterator implements Iterator<String>, 
AutoCloseable {
+    private final Path path;
+
+    private String line = null;
+    private BufferedReader reader = null;
+
+    private DFSLineReaderIterator(final FileSystem fileSystem, final Path 
path) {
+      this.path = path;
+      try {
+        if (fileSystem.exists(path)) {
+          // Initialize reader and read the first line if the file exists.
+          // Allows hasNext and next to return true and the first line, 
respectively.
+          // If not, reader and line simply remain null, and hasNext will 
return false.
+          this.reader = new BufferedReader(
+              new InputStreamReader(fileSystem.open(path), 
StandardCharsets.UTF_8));
+          this.line = reader.readLine();
+        }
+      } catch (final IOException e) {
+        throw new RuntimeException("Unable to create a reader for file " + 
path + ".", e);
+      }
+    }
+
+    @Override
+    public synchronized boolean hasNext() {
+      return reader != null && line != null;
+    }
+
+    @Override
+    public synchronized String next() {
+      if (!hasNext()) {
+        throw new NoSuchElementException("Unable to retrieve line from file " 
+ path + ".");
+      }
+
+      // Record the line we are currently at to return, and fetch the next 
line.
+      final String retLine = line;
+      try {
+        line = reader.readLine();
+        if (line == null) {
+          reader.close();
+          reader = null;
+        }
+      } catch (final IOException e) {
+        throw new RuntimeException("Error retrieving next line from " + path + 
".", e);
+      }
+
+      return retLine;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove is not supported.");
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+      if (reader != null) {
+        reader.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-utils/src/main/java/org/apache/reef/util/CloseableIterable.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/CloseableIterable.java
 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/CloseableIterable.java
new file mode 100644
index 0000000..31ef3a4
--- /dev/null
+++ 
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/CloseableIterable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util;
+
+/**
+ * An iterable that is closeable.
+ */
+public interface CloseableIterable<T> extends AutoCloseable, Iterable<T> {
+}

Reply via email to