Repository: reef
Updated Branches:
refs/heads/master 6cde8ce78 -> 16ca43efe
[REEF-1412] Use a two-file approach for DFSEvaluatorLogOverwriteWriter
This addressed the issue by
* Modifying DFSEvaluatorLogAppendWriter to DFSEvaluatorLogAppendReaderWriter
such that it is aware of both read and write file paths.
* Adding helper DFSLineReader to assist in reading DFS files.
* Increasing number of Evaluator requests in HelloRestartDriver to test the
functionality of writing longer files.
JIRA:
[REEF-1412](https://issues.apache.org/jira/browse/REEF-1412)
Pull Request:
This closes #1023
Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/16ca43ef
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/16ca43ef
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/16ca43ef
Branch: refs/heads/master
Commit: 16ca43efe49269e7b01b3b5783e26b09ba1adc19
Parents: 6cde8ce
Author: Andrew Chung <[email protected]>
Authored: Thu Jun 2 11:51:55 2016 -0700
Committer: Markus Weimer <[email protected]>
Committed: Tue Jun 21 12:43:14 2016 -0700
----------------------------------------------------------------------
.../DriverRestart/HelloRestartDriver.cs | 4 +-
.../DFSEvaluatorLogAppendReaderWriter.java | 86 +++++++++
.../restart/DFSEvaluatorLogAppendWriter.java | 79 --------
.../DFSEvaluatorLogOverwriteReaderWriter.java | 192 +++++++++++++++++++
.../restart/DFSEvaluatorLogOverwriteWriter.java | 104 ----------
.../restart/DFSEvaluatorLogReaderWriter.java | 46 +++++
.../driver/restart/DFSEvaluatorLogWriter.java | 39 ----
.../driver/restart/DFSEvaluatorPreserver.java | 35 ++--
.../yarn/driver/restart/DFSLineReader.java | 136 +++++++++++++
.../org/apache/reef/util/CloseableIterable.java | 25 +++
10 files changed, 500 insertions(+), 246 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
----------------------------------------------------------------------
diff --git
a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
index 8e5c706..d02a614 100644
--- a/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
+++ b/lang/cs/Org.Apache.REEF.Examples/DriverRestart/HelloRestartDriver.cs
@@ -47,8 +47,8 @@ namespace Org.Apache.REEF.Examples.DriverRestart
IObserver<IFailedEvaluator>
{
private static readonly Logger Logger =
Logger.GetLogger(typeof(HelloRestartDriver));
- private const int NumberOfTasksToSubmit = 1;
- private const int NumberOfTasksToSubmitOnRestart = 1;
+ private const int NumberOfTasksToSubmit = 3;
+ private const int NumberOfTasksToSubmitOnRestart = 3;
private readonly IEvaluatorRequestor _evaluatorRequestor;
http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendReaderWriter.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendReaderWriter.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendReaderWriter.java
new file mode 100644
index 0000000..d5023e8
--- /dev/null
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendReaderWriter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.CloseableIterable;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * The DFS evaluator logger that performs regular append. dfs.support.append
should be true.
+ */
+@Private
+public final class DFSEvaluatorLogAppendReaderWriter implements
DFSEvaluatorLogReaderWriter {
+
+ private final FileSystem fileSystem;
+ private final Path changelogPath;
+ private final DFSLineReader reader;
+
+ private boolean fsClosed = false;
+
+ DFSEvaluatorLogAppendReaderWriter(final FileSystem fileSystem, final Path
changelogPath) {
+ this.fileSystem = fileSystem;
+ this.changelogPath = changelogPath;
+ this.reader = new DFSLineReader(fileSystem);
+ }
+
+ /**
+ * Writes a formatted entry (addition or removal) for an Evaluator ID into
the DFS evaluator log.
+ * The entry is appended regularly by an FS that supports append.
+ * @param formattedEntry The formatted entry (entry with evaluator ID and
addition/removal information).
+ * @throws IOException
+ */
+ @Override
+ public synchronized void writeToEvaluatorLog(final String formattedEntry)
throws IOException {
+ final boolean fileCreated = this.fileSystem.exists(this.changelogPath);
+
+ try (
+ final BufferedWriter bw = fileCreated ?
+ new BufferedWriter(new OutputStreamWriter(
+ this.fileSystem.append(this.changelogPath),
StandardCharsets.UTF_8)) :
+ new BufferedWriter(new OutputStreamWriter(
+ this.fileSystem.create(this.changelogPath),
StandardCharsets.UTF_8))
+ ) {
+ bw.write(formattedEntry);
+ }
+ }
+
+ @Override
+ public CloseableIterable<String> readFromEvaluatorLog() throws IOException {
+ return reader.readLinesFromFile(changelogPath);
+ }
+
+ /**
+ * Closes the FileSystem.
+ * @throws Exception
+ */
+ @Override
+ public synchronized void close() throws Exception {
+ if (this.fileSystem != null && !this.fsClosed) {
+ this.fileSystem.close();
+ this.fsClosed = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
deleted file mode 100644
index 7d07939..0000000
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogAppendWriter.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.runtime.yarn.driver.restart;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.reef.annotations.audience.Private;
-
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.StandardCharsets;
-
-/**
- * The DFS evaluator logger that performs regular append. dfs.support.append
should be true.
- */
-@Private
-public final class DFSEvaluatorLogAppendWriter implements
DFSEvaluatorLogWriter {
-
- private final FileSystem fileSystem;
-
- private final Path changelogPath;
-
- private boolean fsClosed = false;
-
- DFSEvaluatorLogAppendWriter(final FileSystem fileSystem, final Path
changelogPath) {
- this.fileSystem = fileSystem;
- this.changelogPath = changelogPath;
- }
-
- /**
- * Writes a formatted entry (addition or removal) for an Evaluator ID into
the DFS evaluator log.
- * The entry is appended regularly by an FS that supports append.
- * @param formattedEntry The formatted entry (entry with evaluator ID and
addition/removal information).
- * @throws IOException
- */
- @Override
- public synchronized void writeToEvaluatorLog(final String formattedEntry)
throws IOException {
- final boolean fileCreated = this.fileSystem.exists(this.changelogPath);
-
- try (
- final BufferedWriter bw = fileCreated ?
- new BufferedWriter(new OutputStreamWriter(
- this.fileSystem.append(this.changelogPath),
StandardCharsets.UTF_8)) :
- new BufferedWriter(new OutputStreamWriter(
- this.fileSystem.create(this.changelogPath),
StandardCharsets.UTF_8))
- ) {
- bw.write(formattedEntry);
- }
- }
-
- /**
- * Closes the FileSystem.
- * @throws Exception
- */
- @Override
- public synchronized void close() throws Exception {
- if (this.fileSystem != null && !this.fsClosed) {
- this.fileSystem.close();
- this.fsClosed = true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteReaderWriter.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteReaderWriter.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteReaderWriter.java
new file mode 100644
index 0000000..3518fb5
--- /dev/null
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteReaderWriter.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.CloseableIterable;
+
+import java.io.*;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * The DFS evaluator logger that does not support append and does append by
overwrite.
+ * dfs.support.append should be false.
+ */
+@Private
+public final class DFSEvaluatorLogOverwriteReaderWriter implements
DFSEvaluatorLogReaderWriter {
+
+ private final FileSystem fileSystem;
+
+ private final DFSLineReader reader;
+ private final Path changeLogPath;
+ private final Path changeLogAltPath;
+
+ // This is the last path we will be writing to.
+ private Path pathToWriteTo = null;
+
+ private boolean fsClosed = false;
+
+ DFSEvaluatorLogOverwriteReaderWriter(final FileSystem fileSystem, final Path
changeLogPath) {
+ this.fileSystem = fileSystem;
+ this.changeLogPath = changeLogPath;
+ this.changeLogAltPath = new Path(changeLogPath + ".alt");
+ this.reader = new DFSLineReader(fileSystem);
+ }
+
+ /**
+ * Writes a formatted entry (addition or removal) for an Evaluator ID into
the DFS evaluator log.
+ * The log is appended to by reading first, adding on the information, and
then overwriting the entire log.
+ * Since the {@link FileSystem} does not support appends, this {@link
DFSEvaluatorLogReaderWriter}
+ * uses a two-file approach, where when we write, we always overwrite the
older file.
+ * @param formattedEntry The formatted entry (entry with evaluator ID and
addition/removal information).
+ * @throws IOException when file cannot be written.
+ */
+ @Override
+ public synchronized void writeToEvaluatorLog(final String formattedEntry)
throws IOException {
+ final Path writePath = getWritePath();
+
+ // readPath is always not the writePath.
+ final Path readPath = getAlternativePath(writePath);
+
+ try (final FSDataOutputStream outputStream =
this.fileSystem.create(writePath, true)) {
+ InputStream inputStream = null;
+ try {
+ final InputStream newEntryInputStream = new ByteArrayInputStream(
+ formattedEntry.getBytes(StandardCharsets.UTF_8));
+
+ if (fileSystem.exists(readPath)) {
+ inputStream = new SequenceInputStream(
+ this.fileSystem.open(readPath), newEntryInputStream);
+ } else {
+ inputStream = newEntryInputStream;
+ }
+
+ IOUtils.copyBytes(inputStream, outputStream, 4096, true);
+ } finally {
+ outputStream.hsync();
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * Since the {@link FileSystem} does not support appends, this {@link
DFSEvaluatorLogReaderWriter}
+ * uses a two-file approach, where when we read, we always read from the
newer file.
+ */
+ @Override
+ public synchronized CloseableIterable<String> readFromEvaluatorLog() throws
IOException {
+ return reader.readLinesFromFile(getLongerFile());
+ }
+
+ /**
+ * Gets the alternative path. Returns one of changeLogPath and
changeLogAltPath.
+ */
+ private synchronized Path getAlternativePath(final Path path) {
+ if (path.equals(changeLogPath)) {
+ return changeLogAltPath;
+ }
+
+ return changeLogPath;
+ }
+
+ /**
+ * Gets the path to write to.
+ */
+ private synchronized Path getWritePath() throws IOException {
+ if (pathToWriteTo == null) {
+ // If we have not yet written before, check existence of files.
+ final boolean originalExists = fileSystem.exists(changeLogPath);
+ final boolean altExists = fileSystem.exists(changeLogAltPath);
+
+ if (originalExists && altExists) {
+ final FileStatus originalStatus =
fileSystem.getFileStatus(changeLogPath);
+ final FileStatus altStatus =
fileSystem.getFileStatus(changeLogAltPath);
+
+ // Return the shorter file.
+ // TODO[JIRA REEF-1413]: This approach will not be able to work in
REEF-1413.
+ // TODO[JIRA REEF-1413]: Note that we cannot use last modified time
because Azure blob's HDFS API only
+ // TODO[JIRA REEF-1413]: supports time resolution up to a second.
+ final long originalLen = originalStatus.getLen();
+ final long altLen = altStatus.getLen();
+
+ if (originalLen < altLen) {
+ pathToWriteTo = changeLogPath;
+ } else {
+ pathToWriteTo = changeLogAltPath;
+ }
+ } else if (originalExists) {
+ // Return the file that does not exist.
+ pathToWriteTo = changeLogAltPath;
+ } else {
+ pathToWriteTo = changeLogPath;
+ }
+ }
+
+ final Path returnPath = pathToWriteTo;
+ pathToWriteTo = getAlternativePath(pathToWriteTo);
+
+ return returnPath;
+ }
+
+ private synchronized Path getLongerFile() throws IOException {
+ final boolean originalExists = fileSystem.exists(changeLogPath);
+ final boolean altExists = fileSystem.exists(changeLogAltPath);
+
+ // If both files exist, return the newest file path.
+ if (originalExists && altExists) {
+ final FileStatus originalStatus =
fileSystem.getFileStatus(changeLogPath);
+ final FileStatus altStatus = fileSystem.getFileStatus(changeLogAltPath);
+
+ final long originalLastModTime = originalStatus.getLen();
+ final long altLastModTime = altStatus.getLen();
+
+ // Return the newer file.
+ if (originalLastModTime >= altLastModTime) {
+ return changeLogPath;
+ }
+
+ return changeLogAltPath;
+ } else if (altExists) {
+ // If only the alt file exists, return the alt file path.
+ return changeLogAltPath;
+ }
+
+ // If only the original file exists or if neither exist, return the
original file path.
+ return changeLogPath;
+ }
+
+ /**
+ * Closes the FileSystem.
+ * @throws Exception
+ */
+ @Override
+ public synchronized void close() throws Exception {
+ if (this.fileSystem != null && !this.fsClosed) {
+ this.fileSystem.close();
+ this.fsClosed = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
deleted file mode 100644
index 5a23231..0000000
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogOverwriteWriter.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.runtime.yarn.driver.restart;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.reef.annotations.audience.Private;
-
-import java.io.*;
-import java.nio.charset.StandardCharsets;
-
-/**
- * The DFS evaluator logger that does not support append and does append by
overwrite.
- * dfs.support.append should be false.
- */
-@Private
-public final class DFSEvaluatorLogOverwriteWriter implements
DFSEvaluatorLogWriter {
-
- private final FileSystem fileSystem;
-
- private final Path changelogPath;
-
- private boolean fsClosed = false;
-
- DFSEvaluatorLogOverwriteWriter(final FileSystem fileSystem, final Path
changelogPath) {
- this.fileSystem = fileSystem;
- this.changelogPath = changelogPath;
- }
-
- /**
- * Writes a formatted entry (addition or removal) for an Evaluator ID into
the DFS evaluator log.
- * The log is appended to by reading first, adding on the information, and
then overwriting the entire
- * log.
- * @param formattedEntry The formatted entry (entry with evaluator ID and
addition/removal information).
- * @throws IOException when file cannot be written.
- */
- @Override
- public synchronized void writeToEvaluatorLog(final String formattedEntry)
throws IOException {
- final boolean fileCreated = this.fileSystem.exists(this.changelogPath);
-
- if (!fileCreated) {
- try (final BufferedWriter bw = new BufferedWriter(
- new
OutputStreamWriter(this.fileSystem.create(this.changelogPath),
StandardCharsets.UTF_8))) {
- bw.write(formattedEntry);
- }
- } else {
- this.appendByDeleteAndCreate(formattedEntry);
- }
- }
-
- /**
- * For certain HDFS implementation, the append operation may not be
supported (e.g., Azure blob - wasb)
- * in this case, we will emulate the append operation by reading the
content, appending entry at the end,
- * then recreating the file with appended content.
- *
- * @throws java.io.IOException when the file can't be written.
- */
- private void appendByDeleteAndCreate(final String appendEntry)
- throws IOException {
- final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-
- try (final InputStream inputStream =
this.fileSystem.open(this.changelogPath)) {
- IOUtils.copyBytes(inputStream, outputStream, 4096, true);
- }
-
- final String newContent = outputStream.toString("UTF-8") + appendEntry;
- this.fileSystem.delete(this.changelogPath, true);
-
- try (final FSDataOutputStream newOutput =
this.fileSystem.create(this.changelogPath);
- final InputStream newInput = new
ByteArrayInputStream(newContent.getBytes(StandardCharsets.UTF_8))) {
- IOUtils.copyBytes(newInput, newOutput, 4096, true);
- }
- }
-
- /**
- * Closes the FileSystem.
- * @throws Exception
- */
- @Override
- public synchronized void close() throws Exception {
- if (this.fileSystem != null && !this.fsClosed) {
- this.fileSystem.close();
- this.fsClosed = true;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogReaderWriter.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogReaderWriter.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogReaderWriter.java
new file mode 100644
index 0000000..2620e94
--- /dev/null
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogReaderWriter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.reef.annotations.audience.Private;
+import org.apache.reef.util.CloseableIterable;
+
+import java.io.IOException;
+
+/**
+ * The Evaluator log reader/writer that reads from and writes to the Hadoop
DFS.
+ * Currently supports regular append and append by overwrite.
+ */
+@Private
+public interface DFSEvaluatorLogReaderWriter extends AutoCloseable {
+
+ /**
+ * Writes a formatted entry (addition or removal) for an Evaluator ID into
the DFS evaluator log.
+ * @param formattedEntry The formatted entry (entry with evaluator ID and
addition/removal information)
+ * @throws IOException
+ */
+ void writeToEvaluatorLog(final String formattedEntry) throws IOException;
+
+ /**
+ * Reads a formatted entry (addition or removal) from the DFS evaluator log.
+ * @return the formatted entry.
+ * @throws IOException
+ */
+ CloseableIterable<String> readFromEvaluatorLog() throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
deleted file mode 100644
index b5e3881..0000000
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorLogWriter.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.reef.runtime.yarn.driver.restart;
-
-import org.apache.reef.annotations.audience.Private;
-
-import java.io.IOException;
-
-/**
- * The Evaluator log writer that writes to DFS. Currently supports regular
append and append by overwrite.
- * Actual log entries should be immutable and no entry should ever be deleted.
To remove an evaluator, a
- * removal entry should be preferred.
- */
-@Private
-public interface DFSEvaluatorLogWriter extends AutoCloseable {
-
- /**
- * Writes a formatted entry (addition or removal) for an Evaluator ID into
the DFS evaluator log.
- * @param formattedEntry The formatted entry (entry with evaluator ID and
addition/removal information)
- * @throws IOException
- */
- void writeToEvaluatorLog(final String formattedEntry) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
index 401b672..7be59e8 100644
---
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSEvaluatorPreserver.java
@@ -32,10 +32,10 @@ import
org.apache.reef.runtime.common.driver.evaluator.EvaluatorManager;
import org.apache.reef.runtime.yarn.driver.parameters.JobSubmissionDirectory;
import org.apache.reef.runtime.yarn.util.YarnUtilities;
import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.CloseableIterable;
import javax.inject.Inject;
import java.io.*;
-import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@@ -57,7 +57,7 @@ public final class DFSEvaluatorPreserver implements
EvaluatorPreserver, AutoClos
private final boolean failDriverOnEvaluatorLogErrors;
- private DFSEvaluatorLogWriter writer;
+ private DFSEvaluatorLogReaderWriter readerWriter;
private Path changeLogLocation;
@@ -88,9 +88,9 @@ public final class DFSEvaluatorPreserver implements
EvaluatorPreserver, AutoClos
boolean appendSupported = config.getBoolean("dfs.support.append", false);
if (appendSupported) {
- this.writer = new DFSEvaluatorLogAppendWriter(this.fileSystem,
this.changeLogLocation);
+ this.readerWriter = new
DFSEvaluatorLogAppendReaderWriter(this.fileSystem, this.changeLogLocation);
} else {
- this.writer = new DFSEvaluatorLogOverwriteWriter(this.fileSystem,
this.changeLogLocation);
+ this.readerWriter = new
DFSEvaluatorLogOverwriteReaderWriter(this.fileSystem, this.changeLogLocation);
}
} catch (final IOException e) {
final String errMsg = "Cannot read from log file with Exception " + e +
@@ -100,7 +100,7 @@ public final class DFSEvaluatorPreserver implements
EvaluatorPreserver, AutoClos
this.handleException(e, errMsg, fatalMsg);
this.fileSystem = null;
this.changeLogLocation = null;
- this.writer = null;
+ this.readerWriter = null;
}
}
@@ -123,7 +123,6 @@ public final class DFSEvaluatorPreserver implements
EvaluatorPreserver, AutoClos
/**
* Recovers the set of evaluators that are alive.
- * @return
*/
@Override
public synchronized Set<String> recoverEvaluators() {
@@ -135,14 +134,8 @@ public final class DFSEvaluatorPreserver implements
EvaluatorPreserver, AutoClos
return expectedContainers;
}
- if (!this.fileSystem.exists(this.changeLogLocation)) {
- // empty set
- return expectedContainers;
- } else {
- final BufferedReader br = new BufferedReader(
- new
InputStreamReader(this.fileSystem.open(this.changeLogLocation),
StandardCharsets.UTF_8));
- String line = br.readLine();
- while (line != null) {
+ try (final CloseableIterable<String> evaluatorLogIterable =
readerWriter.readFromEvaluatorLog()) {
+ for (final String line : evaluatorLogIterable) {
if (line.startsWith(ADD_FLAG)) {
final String containerId = line.substring(ADD_FLAG.length());
if (expectedContainers.contains(containerId)) {
@@ -159,18 +152,16 @@ public final class DFSEvaluatorPreserver implements
EvaluatorPreserver, AutoClos
}
expectedContainers.remove(containerId);
}
- line = br.readLine();
}
- br.close();
}
- } catch (final IOException e) {
+ } catch (final Exception e) {
final String errMsg = "Cannot read from log file with Exception " + e +
", evaluators will not be recovered.";
final String fatalMsg = "Cannot read from evaluator log.";
-
this.handleException(e, errMsg, fatalMsg);
}
+
return expectedContainers;
}
@@ -200,7 +191,7 @@ public final class DFSEvaluatorPreserver implements
EvaluatorPreserver, AutoClos
private void logContainerChange(final String entry) {
try {
- this.writer.writeToEvaluatorLog(entry);
+ this.readerWriter.writeToEvaluatorLog(entry);
} catch (final IOException e) {
final String errorMsg = "Unable to log the change of container [" +
entry +
"] to the container log. Driver restart won't work properly.";
@@ -232,13 +223,13 @@ public final class DFSEvaluatorPreserver implements
EvaluatorPreserver, AutoClos
}
/**
- * Closes the writer, which in turn closes the FileSystem.
+ * Closes the readerWriter, which in turn closes the FileSystem.
* @throws Exception
*/
@Override
public synchronized void close() throws Exception {
- if (this.writer != null && !this.writerClosed) {
- this.writer.close();
+ if (this.readerWriter != null && !this.writerClosed) {
+ this.readerWriter.close();
this.writerClosed = true;
}
}
http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSLineReader.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSLineReader.java
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSLineReader.java
new file mode 100644
index 0000000..d707b12
--- /dev/null
+++
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/restart/DFSLineReader.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.yarn.driver.restart;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.reef.util.CloseableIterable;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * A reads lines from a {@link Path} on a {@link FileSystem}.
+ * Assumes the file is encoded in {@link StandardCharsets#UTF_8}.
+ */
+final class DFSLineReader {
+
+ private final FileSystem fileSystem;
+
+ DFSLineReader(final FileSystem fileSystem) {
+ this.fileSystem = fileSystem;
+ }
+
+ /**
+ * Reads lines from the specified path.
+ */
+ CloseableIterable<String> readLinesFromFile(final Path path) {
+ return new DFSLineReaderIterable(fileSystem, path);
+ }
+
+ /**
+ * Iterable of DFS file lines.
+ */
+ private final class DFSLineReaderIterable implements
CloseableIterable<String> {
+
+ private final DFSLineReaderIterator iterator;
+
+ private DFSLineReaderIterable(final FileSystem fileSystem, final Path
path) {
+ this.iterator = new DFSLineReaderIterator(fileSystem, path);
+ }
+
+ @Override
+ public Iterator<String> iterator() {
+ return iterator;
+ }
+
+ @Override
+ public void close() throws Exception {
+ iterator.close();
+ }
+ }
+
+ /**
+ * Iterator of DFS file lines.
+ */
+ private final class DFSLineReaderIterator implements Iterator<String>,
AutoCloseable {
+ private final Path path;
+
+ private String line = null;
+ private BufferedReader reader = null;
+
+ private DFSLineReaderIterator(final FileSystem fileSystem, final Path
path) {
+ this.path = path;
+ try {
+ if (fileSystem.exists(path)) {
+ // Initialize reader and read the first line if the file exists.
+ // Allows hasNext and next to return true and the first line,
respectively.
+ // If not, reader and line simply remain null, and hasNext will
return false.
+ this.reader = new BufferedReader(
+ new InputStreamReader(fileSystem.open(path),
StandardCharsets.UTF_8));
+ this.line = reader.readLine();
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException("Unable to create a reader for file " +
path + ".", e);
+ }
+ }
+
+ @Override
+ public synchronized boolean hasNext() {
+ return reader != null && line != null;
+ }
+
+ @Override
+ public synchronized String next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException("Unable to retrieve line from file "
+ path + ".");
+ }
+
+ // Record the line we are currently at to return, and fetch the next
line.
+ final String retLine = line;
+ try {
+ line = reader.readLine();
+ if (line == null) {
+ reader.close();
+ reader = null;
+ }
+ } catch (final IOException e) {
+ throw new RuntimeException("Error retrieving next line from " + path +
".", e);
+ }
+
+ return retLine;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("remove is not supported.");
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/reef/blob/16ca43ef/lang/java/reef-utils/src/main/java/org/apache/reef/util/CloseableIterable.java
----------------------------------------------------------------------
diff --git
a/lang/java/reef-utils/src/main/java/org/apache/reef/util/CloseableIterable.java
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/CloseableIterable.java
new file mode 100644
index 0000000..31ef3a4
--- /dev/null
+++
b/lang/java/reef-utils/src/main/java/org/apache/reef/util/CloseableIterable.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util;
+
+/**
+ * An iterable that is closeable.
+ */
+public interface CloseableIterable<T> extends AutoCloseable, Iterable<T> {
+}