Made HDFS::copyToLocal asynchronous.

Review: https://reviews.apache.org/r/40945/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3d5e42af
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3d5e42af
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3d5e42af

Branch: refs/heads/master
Commit: 3d5e42af8d4cb3d1d03db505b0140ff35f37d912
Parents: 905d5fb
Author: Jie Yu <[email protected]>
Authored: Mon Dec 14 11:42:46 2015 -0800
Committer: Jie Yu <[email protected]>
Committed: Tue Dec 15 10:55:33 2015 -0800

----------------------------------------------------------------------
 src/hdfs/hdfs.cpp        | 35 ++++++++++++++++++++++++-----------
 src/hdfs/hdfs.hpp        |  4 +++-
 src/launcher/fetcher.cpp |  9 ++++++---
 3 files changed, 33 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3d5e42af/src/hdfs/hdfs.cpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.cpp b/src/hdfs/hdfs.cpp
index 3c3f867..2b7a58e 100644
--- a/src/hdfs/hdfs.cpp
+++ b/src/hdfs/hdfs.cpp
@@ -272,22 +272,35 @@ Future<Nothing> HDFS::copyFromLocal(const string& from, 
const string& to)
 }
 
 
-Try<Nothing> HDFS::copyToLocal(const string& _from, const string& to)
+Future<Nothing> HDFS::copyToLocal(const string& from, const string& to)
 {
-  const string from = absolutePath(_from);
-
-  Try<string> command = strings::format(
-      "%s fs -copyToLocal '%s' '%s'", hadoop, from, to);
+  Try<Subprocess> s = subprocess(
+      hadoop,
+      {"hadoop", "fs", "-copyToLocal", absolutePath(from), to},
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
 
-  CHECK_SOME(command);
+  if (s.isError()) {
+    return Failure("Failed to execute the subprocess: " + s.error());
+  }
 
-  Try<string> out = os::shell(command.get());
+  return result(s.get())
+    .then([](const CommandResult& result) -> Future<Nothing> {
+      if (result.status.isNone()) {
+        return Failure("Failed to reap the subprocess");
+      }
 
-  if (out.isError()) {
-    return Error(out.error());
-  }
+      if (result.status.get() != 0) {
+        return Failure(
+            "Unexpected result from the subprocess: "
+            "status='" + stringify(result.status.get()) + "', " +
+            "stdout='" + result.out + "', " +
+            "stderr='" + result.err + "'");
+      }
 
-  return Nothing();
+      return Nothing();
+    });
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d5e42af/src/hdfs/hdfs.hpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp
index efe6e1d..24d3ffc 100644
--- a/src/hdfs/hdfs.hpp
+++ b/src/hdfs/hdfs.hpp
@@ -57,7 +57,9 @@ public:
       const std::string& from,
       const std::string& to);
 
-  Try<Nothing> copyToLocal(const std::string& from, const std::string& to);
+  process::Future<Nothing> copyToLocal(
+      const std::string& from,
+      const std::string& to);
 
 private:
   explicit HDFS(const std::string& _hadoop)

http://git-wip-us.apache.org/repos/asf/mesos/blob/3d5e42af/src/launcher/fetcher.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp
index eafdda3..0ff8598 100644
--- a/src/launcher/fetcher.cpp
+++ b/src/launcher/fetcher.cpp
@@ -109,9 +109,12 @@ static Try<string> downloadWithHadoopClient(
   LOG(INFO) << "Downloading resource with Hadoop client from '" << sourceUri
             << "' to '" << destinationPath << "'";
 
-  Try<Nothing> result = hdfs.get()->copyToLocal(sourceUri, destinationPath);
-  if (result.isError()) {
-    return Error("HDFS copyToLocal failed: " + result.error());
+  Future<Nothing> result = hdfs.get()->copyToLocal(sourceUri, destinationPath);
+  result.await();
+
+  if (!result.isReady()) {
+    return Error("HDFS copyToLocal failed: " +
+                 (result.isFailed() ? result.failure() : "discarded"));
   }
 
   return destinationPath;

Reply via email to