Made HDFS::copyFromLocal asynchronous.

Review: https://reviews.apache.org/r/40943/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/905d5fb4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/905d5fb4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/905d5fb4

Branch: refs/heads/master
Commit: 905d5fb4dc433c9922fb3965a1bb4c40805f3b92
Parents: c08d348
Author: Jie Yu <[email protected]>
Authored: Mon Dec 14 11:42:36 2015 -0800
Committer: Jie Yu <[email protected]>
Committed: Tue Dec 15 10:55:33 2015 -0800

----------------------------------------------------------------------
 src/cli/execute.cpp |  9 ++++++---
 src/hdfs/hdfs.cpp   | 37 +++++++++++++++++++++++++------------
 src/hdfs/hdfs.hpp   |  6 +++++-
 3 files changed, 36 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/905d5fb4/src/cli/execute.cpp
----------------------------------------------------------------------
diff --git a/src/cli/execute.cpp b/src/cli/execute.cpp
index fec2ad4..a2b610f 100644
--- a/src/cli/execute.cpp
+++ b/src/cli/execute.cpp
@@ -408,9 +408,12 @@ int main(int argc, char** argv)
       return EXIT_FAILURE;
     }
 
-    Try<Nothing> copy = hdfs.get()->copyFromLocal(flags.package.get(), path);
-    if (copy.isError()) {
-      cerr << "Failed to copy package: " << copy.error() << endl;
+    Future<Nothing> copy = hdfs.get()->copyFromLocal(flags.package.get(), 
path);
+    copy.await();
+
+    if (!copy.isReady()) {
+      cerr << "Failed to copy package: "
+           << (copy.isFailed() ? copy.failure() : "discarded") << endl;
       return EXIT_FAILURE;
     }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/905d5fb4/src/hdfs/hdfs.cpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.cpp b/src/hdfs/hdfs.cpp
index ed7ab63..3c3f867 100644
--- a/src/hdfs/hdfs.cpp
+++ b/src/hdfs/hdfs.cpp
@@ -236,26 +236,39 @@ Future<Nothing> HDFS::rm(const string& path)
 }
 
 
-Try<Nothing> HDFS::copyFromLocal(const string& from, const string& _to)
+Future<Nothing> HDFS::copyFromLocal(const string& from, const string& to)
 {
   if (!os::exists(from)) {
-    return Error("Failed to find " + from);
+    return Failure("Failed to find '" + from + "'");
   }
 
-  const string to = absolutePath(_to);
-
-  Try<string> command = strings::format(
-      "%s fs -copyFromLocal '%s' '%s'", hadoop, from, to);
+  Try<Subprocess> s = subprocess(
+      hadoop,
+      {"hadoop", "fs", "-copyFromLocal", from, absolutePath(to)},
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PIPE(),
+      Subprocess::PIPE());
 
-  CHECK_SOME(command);
+  if (s.isError()) {
+    return Failure("Failed to execute the subprocess: " + s.error());
+  }
 
-  Try<string> out = os::shell(command.get());
+  return result(s.get())
+    .then([](const CommandResult& result) -> Future<Nothing> {
+      if (result.status.isNone()) {
+        return Failure("Failed to reap the subprocess");
+      }
 
-  if (out.isError()) {
-    return Error(out.error());
-  }
+      if (result.status.get() != 0) {
+        return Failure(
+            "Unexpected result from the subprocess: "
+            "status='" + stringify(result.status.get()) + "', " +
+            "stdout='" + result.out + "', " +
+            "stderr='" + result.err + "'");
+      }
 
-  return Nothing();
+      return Nothing();
+    });
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/905d5fb4/src/hdfs/hdfs.hpp
----------------------------------------------------------------------
diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp
index 13ec02c..efe6e1d 100644
--- a/src/hdfs/hdfs.hpp
+++ b/src/hdfs/hdfs.hpp
@@ -52,7 +52,11 @@ public:
   process::Future<bool> exists(const std::string& path);
   Try<Bytes> du(const std::string& path);
   process::Future<Nothing> rm(const std::string& path);
-  Try<Nothing> copyFromLocal(const std::string& from, const std::string& to);
+
+  process::Future<Nothing> copyFromLocal(
+      const std::string& from,
+      const std::string& to);
+
   Try<Nothing> copyToLocal(const std::string& from, const std::string& to);
 
 private:

Reply via email to