Made HDFS::du asynchrounous. Review: https://reviews.apache.org/r/40946/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/00f6693a Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/00f6693a Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/00f6693a Branch: refs/heads/master Commit: 00f6693a6d9d71d122cee066a9cbb5d25bb67e1a Parents: 3d5e42a Author: Jie Yu <[email protected]> Authored: Mon Dec 14 11:42:58 2015 -0800 Committer: Jie Yu <[email protected]> Committed: Tue Dec 15 10:55:33 2015 -0800 ---------------------------------------------------------------------- src/hdfs/hdfs.cpp | 72 +++++++++++++++++--------------- src/hdfs/hdfs.hpp | 2 +- src/slave/containerizer/fetcher.cpp | 9 ++-- 3 files changed, 46 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mesos/blob/00f6693a/src/hdfs/hdfs.cpp ---------------------------------------------------------------------- diff --git a/src/hdfs/hdfs.cpp b/src/hdfs/hdfs.cpp index 2b7a58e..51f016b 100644 --- a/src/hdfs/hdfs.cpp +++ b/src/hdfs/hdfs.cpp @@ -159,48 +159,54 @@ Future<bool> HDFS::exists(const string& path) } -Try<Bytes> HDFS::du(const string& _path) +Future<Bytes> HDFS::du(const string& _path) { const string path = absolutePath(_path); - Try<string> command = strings::format( - "%s fs -du '%s'", hadoop, path); - - CHECK_SOME(command); - - // We are piping stderr to stdout so that we can see the error (if - // any) in the logs emitted by `os::shell()` in case of failure. - // - // TODO(marco): this was the existing logic, but not sure it is - // actually needed. - Try<string> out = os::shell(command.get() + " 2>&1"); + Try<Subprocess> s = subprocess( + hadoop, + {"hadoop", "fs", "-du", path}, + Subprocess::PATH("/dev/null"), + Subprocess::PIPE(), + Subprocess::PIPE()); - if (out.isError()) { - return Error("HDFS du failed: " + out.error()); + if (s.isError()) { + return Failure("Failed to execute the subprocess: " + s.error()); } - // We expect 2 space-separated output fields; a number of bytes then - // the name of the path we gave. The 'hadoop' command can emit - // various WARN or other log messages, so we make an effort to scan - // for the field we want. - foreach (const string& line, strings::tokenize(out.get(), "\n")) { - // Note that we use tokenize() rather than split() since fields - // can be delimited by multiple spaces. - vector<string> fields = strings::tokenize(line, " "); - - if (fields.size() == 2 && fields[1] == path) { - Result<size_t> size = numify<size_t>(fields[0]); - if (size.isError()) { - return Error("HDFS du returned unexpected format: " + size.error()); - } else if (size.isNone()) { - return Error("HDFS du returned unexpected format"); + return result(s.get()) + .then([path](const CommandResult& result) -> Future<Bytes> { + if (result.status.isNone()) { + return Failure("Failed to reap the subprocess"); + } + + if (result.status.get() != 0) { + return Failure( + "Unexpected result from the subprocess: " + "status='" + stringify(result.status.get()) + "', " + + "stdout='" + result.out + "', " + + "stderr='" + result.err + "'"); } - return Bytes(size.get()); - } - } + // We expect 2 space-separated output fields; a number of bytes + // then the name of the path we gave. The 'hadoop' command can + // emit various WARN or other log messages, so we make an effort + // to scan for the field we want. + foreach (const string& line, strings::tokenize(result.out, "\n")) { + // Note that we use tokenize() rather than split() since + // fields can be delimited by multiple spaces. + vector<string> fields = strings::tokenize(line, " \t"); + + if (fields.size() == 2 && fields[1] == path) { + Result<size_t> size = numify<size_t>(fields[0]); + if (size.isSome()) { + return Bytes(size.get()); + } + } + } - return Error("HDFS du returned an unexpected format: '" + out.get() + "'"); + return Failure("Unexpected output format: '" + result.out + "'"); + }); } http://git-wip-us.apache.org/repos/asf/mesos/blob/00f6693a/src/hdfs/hdfs.hpp ---------------------------------------------------------------------- diff --git a/src/hdfs/hdfs.hpp b/src/hdfs/hdfs.hpp index 24d3ffc..abdb9b9 100644 --- a/src/hdfs/hdfs.hpp +++ b/src/hdfs/hdfs.hpp @@ -50,7 +50,7 @@ public: const Option<std::string>& hadoop = None()); process::Future<bool> exists(const std::string& path); - Try<Bytes> du(const std::string& path); + process::Future<Bytes> du(const std::string& path); process::Future<Nothing> rm(const std::string& path); process::Future<Nothing> copyFromLocal( http://git-wip-us.apache.org/repos/asf/mesos/blob/00f6693a/src/slave/containerizer/fetcher.cpp ---------------------------------------------------------------------- diff --git a/src/slave/containerizer/fetcher.cpp b/src/slave/containerizer/fetcher.cpp index e479bd3..4ac9149 100644 --- a/src/slave/containerizer/fetcher.cpp +++ b/src/slave/containerizer/fetcher.cpp @@ -281,9 +281,12 @@ static Try<Bytes> fetchSize( return Error("Failed to create HDFS client: " + hdfs.error()); } - Try<Bytes> size = hdfs.get()->du(uri); - if (size.isError()) { - return Error("Hadoop client could not determine size: " + size.error()); + Future<Bytes> size = hdfs.get()->du(uri); + size.await(); + + if (!size.isReady()) { + return Error("Hadoop client could not determine size: " + + (size.isFailed() ? size.failure() : "discarded")); } return size.get();
