Made HDFS::copyFromLocal asynchronous. Review: https://reviews.apache.org/r/40943/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/905d5fb4 Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/905d5fb4 Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/905d5fb4 Branch: refs/heads/master Commit: 905d5fb4dc433c9922fb3965a1bb4c40805f3b92 Parents: c08d348 Author: Jie Yu <[email protected]> Authored: Mon Dec 14 11:42:36 2015 -0800 Committer: Jie Yu <[email protected]> Committed: Tue Dec 15 10:55:33 2015 -0800 ---------------------------------------------------------------------- src/cli/execute.cpp | 9 ++++++--- src/hdfs/hdfs.cpp | 37 +++++++++++++++++++++++++------------ src/hdfs/hdfs.hpp | 6 +++++- 3 files changed, 36 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/905d5fb4/src/cli/execute.cpp ---------------------------------------------------------------------- diff --git a/src/cli/execute.cpp b/src/cli/execute.cpp index fec2ad4..a2b610f 100644 --- a/src/cli/execute.cpp +++ b/src/cli/execute.cpp @@ -408,9 +408,12 @@ int main(int argc, char** argv) return EXIT_FAILURE; } - Try<Nothing> copy = hdfs.get()->copyFromLocal(flags.package.get(), path); - if (copy.isError()) { - cerr << "Failed to copy package: " << copy.error() << endl; + Future<Nothing> copy = hdfs.get()->copyFromLocal(flags.package.get(), path); + copy.await(); + + if (!copy.isReady()) { + cerr << "Failed to copy package: " + << (copy.isFailed() ? copy.failure() : "discarded") << endl; return EXIT_FAILURE; } http://git-wip-us.apache.org/repos/asf/mesos/blob/905d5fb4/src/hdfs/hdfs.cpp ---------------------------------------------------------------------- diff --git a/src/hdfs/hdfs.cpp b/src/hdfs/hdfs.cpp index ed7ab63..3c3f867 100644 --- a/src/hdfs/hdfs.cpp +++ b/src/hdfs/hdfs.cpp @@ -236,26 +236,39 @@ Future<Nothing> HDFS::rm(const string& path) } -Try<Nothing> HDFS::copyFromLocal(const string& from, const string& _to) +Future<Nothing> HDFS::copyFromLocal(const string& from, const string& to) { if (!os::exists(from)) { - return Error("Failed to find " + from); + return Failure("Failed to find '" + from + "'"); } - const string to = absolutePath(_to); - - Try<string> command = strings::format( - "%s fs -copyFromLocal '%s' '%s'", hadoop, from, to); + Try<Subprocess> s = subprocess( + hadoop, + {"hadoop", "fs", "-copyFromLocal", from, absolutePath(to)}, + Subprocess::PATH("/dev/null"), + Subprocess::PIPE(), + Subprocess::PIPE()); - CHECK_SOME(command); + if (s.isError()) { + return Failure("Failed to execute the subprocess: " + s.error()); + } - Try<string> out = os::shell(command.get()); + return result(s.get()) + .then([](const CommandResult& result) -> Future<Nothing> { + if (result.status.isNone()) { + return Failure("Failed to reap the subprocess"); + } - if (out.isError()) { - return Error(out.error()); - } + if (result.status.get() != 0) { + return Failure( + "Unexpected result from the subprocess: " + "status='" + stringify(result.status.get()) + "', " + + "stdout='" + result.out + "', " + + "stderr='" + result.err + "'"); + } - return Nothing(); + return Nothing(); + }); } http://git-wip-us.apache.org/repos/asf/mesos/blob/905d5fb4/src/hdfs/hdfs.hpp ---------------------------------------------------------------------- diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp index 13ec02c..efe6e1d 100644 --- a/src/hdfs/hdfs.hpp +++ b/src/hdfs/hdfs.hpp @@ -52,7 +52,11 @@ public: process::Future<bool> exists(const std::string& path); Try<Bytes> du(const std::string& path); process::Future<Nothing> rm(const std::string& path); - Try<Nothing> copyFromLocal(const std::string& from, const std::string& to); + + process::Future<Nothing> copyFromLocal( + const std::string& from, + const std::string& to); + Try<Nothing> copyToLocal(const std::string& from, const std::string& to); private:
