Made HDFS::copyToLocal asynchronous. Review: https://reviews.apache.org/r/40945/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3d5e42af Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3d5e42af Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3d5e42af Branch: refs/heads/master Commit: 3d5e42af8d4cb3d1d03db505b0140ff35f37d912 Parents: 905d5fb Author: Jie Yu <[email protected]> Authored: Mon Dec 14 11:42:46 2015 -0800 Committer: Jie Yu <[email protected]> Committed: Tue Dec 15 10:55:33 2015 -0800 ---------------------------------------------------------------------- src/hdfs/hdfs.cpp | 35 ++++++++++++++++++++++++----------- src/hdfs/hdfs.hpp | 4 +++- src/launcher/fetcher.cpp | 9 ++++++--- 3 files changed, 33 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/3d5e42af/src/hdfs/hdfs.cpp ---------------------------------------------------------------------- diff --git a/src/hdfs/hdfs.cpp b/src/hdfs/hdfs.cpp index 3c3f867..2b7a58e 100644 --- a/src/hdfs/hdfs.cpp +++ b/src/hdfs/hdfs.cpp @@ -272,22 +272,35 @@ Future<Nothing> HDFS::copyFromLocal(const string& from, const string& to) } -Try<Nothing> HDFS::copyToLocal(const string& _from, const string& to) +Future<Nothing> HDFS::copyToLocal(const string& from, const string& to) { - const string from = absolutePath(_from); - - Try<string> command = strings::format( - "%s fs -copyToLocal '%s' '%s'", hadoop, from, to); + Try<Subprocess> s = subprocess( + hadoop, + {"hadoop", "fs", "-copyToLocal", absolutePath(from), to}, + Subprocess::PATH("/dev/null"), + Subprocess::PIPE(), + Subprocess::PIPE()); - CHECK_SOME(command); + if (s.isError()) { + return Failure("Failed to execute the subprocess: " + s.error()); + } - Try<string> out = os::shell(command.get()); + return result(s.get()) + .then([](const CommandResult& result) -> Future<Nothing> { + if (result.status.isNone()) { + return Failure("Failed to reap the subprocess"); + } - if (out.isError()) { - return Error(out.error()); - } + if (result.status.get() != 0) { + return Failure( + "Unexpected result from the subprocess: " + "status='" + stringify(result.status.get()) + "', " + + "stdout='" + result.out + "', " + + "stderr='" + result.err + "'"); + } - return Nothing(); + return Nothing(); + }); } http://git-wip-us.apache.org/repos/asf/mesos/blob/3d5e42af/src/hdfs/hdfs.hpp ---------------------------------------------------------------------- diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp index efe6e1d..24d3ffc 100644 --- a/src/hdfs/hdfs.hpp +++ b/src/hdfs/hdfs.hpp @@ -57,7 +57,9 @@ public: const std::string& from, const std::string& to); - Try<Nothing> copyToLocal(const std::string& from, const std::string& to); + process::Future<Nothing> copyToLocal( + const std::string& from, + const std::string& to); private: explicit HDFS(const std::string& _hadoop) http://git-wip-us.apache.org/repos/asf/mesos/blob/3d5e42af/src/launcher/fetcher.cpp ---------------------------------------------------------------------- diff --git a/src/launcher/fetcher.cpp b/src/launcher/fetcher.cpp index eafdda3..0ff8598 100644 --- a/src/launcher/fetcher.cpp +++ b/src/launcher/fetcher.cpp @@ -109,9 +109,12 @@ static Try<string> downloadWithHadoopClient( LOG(INFO) << "Downloading resource with Hadoop client from '" << sourceUri << "' to '" << destinationPath << "'"; - Try<Nothing> result = hdfs.get()->copyToLocal(sourceUri, destinationPath); - if (result.isError()) { - return Error("HDFS copyToLocal failed: " + result.error()); + Future<Nothing> result = hdfs.get()->copyToLocal(sourceUri, destinationPath); + result.await(); + + if (!result.isReady()) { + return Error("HDFS copyToLocal failed: " + + (result.isFailed() ? result.failure() : "discarded")); } return destinationPath;
