HDFS-10754: libhdfs++: Create tools directory and implement hdfs_cat, hdfs_chgrp, hdfs_chown, hdfs_chmod and hdfs_find. Contributed by Anatoli Shein.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4f6cb5d1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4f6cb5d1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4f6cb5d1 Branch: refs/heads/trunk Commit: 4f6cb5d1a150b366516ddfc3409925c26a1a2a6a Parents: 05ddb31 Author: James <[email protected]> Authored: Fri Aug 26 12:52:05 2016 -0400 Committer: James Clampffer <[email protected]> Committed: Thu Mar 22 17:19:47 2018 -0400 ---------------------------------------------------------------------- .../src/main/native/libhdfspp/CMakeLists.txt | 1 + .../libhdfspp/examples/cpp/CMakeLists.txt | 3 +- .../native/libhdfspp/examples/cpp/cat/cat.cpp | 126 +++++----- .../libhdfspp/examples/cpp/find/CMakeLists.txt | 35 +++ .../native/libhdfspp/examples/cpp/find/find.cpp | 162 ++++++++++++ .../libhdfspp/examples/cpp/gendirs/gendirs.cpp | 116 ++++----- .../native/libhdfspp/include/hdfspp/hdfs_ext.h | 14 ++ .../native/libhdfspp/include/hdfspp/hdfspp.h | 45 +++- .../native/libhdfspp/include/hdfspp/statinfo.h | 1 + .../native/libhdfspp/lib/bindings/c/hdfs.cc | 64 +++-- .../lib/common/configuration_loader.cc | 8 +- .../libhdfspp/lib/common/configuration_loader.h | 8 +- .../lib/common/configuration_loader_impl.h | 2 +- .../main/native/libhdfspp/lib/fs/filesystem.cc | 244 +++++++++++++++++-- .../main/native/libhdfspp/lib/fs/filesystem.h | 68 +++++- .../libhdfspp/lib/fs/namenode_operations.cc | 82 ++----- .../libhdfspp/lib/fs/namenode_operations.h | 8 +- .../libhdfspp/tests/configuration_test.cc | 84 +++++-- .../native/libhdfspp/tests/configuration_test.h | 4 +- .../libhdfspp/tests/hdfs_configuration_test.cc | 9 +- .../src/main/native/libhdfspp/tests/hdfs_shim.c | 4 + .../libhdfspp/tests/libhdfs_wrapper_undefs.h | 1 + .../libhdfspp/tests/libhdfspp_wrapper_defines.h | 1 + .../main/native/libhdfspp/tools/CMakeLists.txt | 42 ++++ .../main/native/libhdfspp/tools/hdfs_cat.cpp | 120 +++++++++ .../main/native/libhdfspp/tools/hdfs_chgrp.cpp | 196 +++++++++++++++ .../main/native/libhdfspp/tools/hdfs_chmod.cpp | 194 +++++++++++++++ .../main/native/libhdfspp/tools/hdfs_chown.cpp | 206 ++++++++++++++++ .../main/native/libhdfspp/tools/hdfs_find.cpp | 156 ++++++++++++ .../native/libhdfspp/tools/tools_common.cpp | 70 ++++++ .../main/native/libhdfspp/tools/tools_common.h | 39 +++ 31 files changed, 1829 insertions(+), 284 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt index a663e60..da56403 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/CMakeLists.txt @@ -163,6 +163,7 @@ add_subdirectory(third_party/uriparser2) add_subdirectory(lib) add_subdirectory(tests) add_subdirectory(examples) +add_subdirectory(tools) # create an empty file; hadoop_add_dual_library wraps add_library which # requires at least one file as an argument http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt index 183299a..9e16b0b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/CMakeLists.txt @@ -17,4 +17,5 @@ # add_subdirectory(cat) -add_subdirectory(gendirs) \ No newline at end of file +add_subdirectory(gendirs) +add_subdirectory(find) http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/cat.cpp ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/cat.cpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/cat.cpp index bfab507..17626ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/cat.cpp +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/cat/cat.cpp @@ -17,105 +17,91 @@ under the License. */ -/* - A a stripped down version of unix's "cat". - Doesn't deal with any flags for now, will just attempt to read the whole file. -*/ +/** + * Unix-like cat tool example. + * + * Reads the specified file from HDFS and outputs to stdout. + * + * Usage: cat /<path-to-file> + * + * Example: cat /dir/file + * + * @param path-to-file Absolute path to the file to read. + * + **/ #include "hdfspp/hdfspp.h" #include "common/hdfs_configuration.h" #include "common/configuration_loader.h" -#include "common/uri.h" - -#include <google/protobuf/io/coded_stream.h> -using namespace std; -using namespace hdfs; +#include <google/protobuf/stubs/common.h> -#define SCHEME "hdfs" +#define BUF_SIZE 4096 int main(int argc, char *argv[]) { if (argc != 2) { - cerr << "usage: cat [hdfs://[<hostname>:<port>]]/<path-to-file>" << endl; - return 1; - } - - optional<URI> uri; - const string uri_path = argv[1]; - - //Separate check for scheme is required, otherwise common/uri.h library causes memory issues under valgrind - size_t scheme_end = uri_path.find("://"); - if (scheme_end != string::npos) { - if(uri_path.substr(0, string(SCHEME).size()).compare(SCHEME) != 0) { - cerr << "Scheme " << uri_path.substr(0, scheme_end) << ":// is not supported" << endl; - return 1; - } else { - uri = URI::parse_from_string(uri_path); - } - } - if (!uri) { - cerr << "Malformed URI: " << uri_path << endl; - return 1; + std::cerr << "usage: cat /<path-to-file>" << std::endl; + exit(EXIT_FAILURE); } - - ConfigurationLoader loader; - optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>(); - const char * envHadoopConfDir = getenv("HADOOP_CONF_DIR"); - if (envHadoopConfDir && (*envHadoopConfDir != 0) ) { - config = loader.OverlayResourceFile(*config, string(envHadoopConfDir) + "/core-site.xml"); - } - - Options options; + std::string path = argv[1]; + + hdfs::Options options; + //Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf" + hdfs::ConfigurationLoader loader; + //Loading default config files core-site.xml and hdfs-site.xml from the config path + hdfs::optional<hdfs::HdfsConfiguration> config = loader.LoadDefaultResources<hdfs::HdfsConfiguration>(); + //TODO: HDFS-9539 - after this is resolved, valid config will always be returned. if(config){ + //Loading options from the config options = config->GetOptions(); } - - IoService * io_service = IoService::New(); - - FileSystem *fs_raw = FileSystem::New(io_service, "", options); - if (!fs_raw) { - cerr << "Could not create FileSystem object" << endl; - return 1; + hdfs::IoService * io_service = hdfs::IoService::New(); + //Wrapping fs into a shared pointer to guarantee deletion + std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options)); + if (!fs) { + std::cerr << "Could not connect the file system." << std::endl; + exit(EXIT_FAILURE); } - //wrapping fs_raw into a unique pointer to guarantee deletion - unique_ptr<FileSystem> fs(fs_raw); - - Status stat = fs->Connect(uri->get_host(), to_string(*(uri->get_port()))); - if (!stat.ok()) { - cerr << "Could not connect to " << uri->get_host() << ":" << *(uri->get_port()) << endl; - return 1; + hdfs::Status status = fs->ConnectToDefaultFs(); + if (!status.ok()) { + if(!options.defaultFS.get_host().empty()){ + std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl; + } else { + std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl; + } + exit(EXIT_FAILURE); } - FileHandle *file_raw = nullptr; - stat = fs->Open(uri->get_path(), &file_raw); - if (!stat.ok()) { - cerr << "Could not open file " << uri->get_path() << endl; - return 1; + hdfs::FileHandle *file_raw = nullptr; + status = fs->Open(path, &file_raw); + if (!status.ok()) { + std::cerr << "Could not open file " << path << ". " << status.ToString() << std::endl; + exit(EXIT_FAILURE); } //wrapping file_raw into a unique pointer to guarantee deletion - unique_ptr<FileHandle> file(file_raw); + std::unique_ptr<hdfs::FileHandle> file(file_raw); - char input_buffer[4096]; - ssize_t read_bytes_count = 0; - size_t last_read_bytes = 0; + char input_buffer[BUF_SIZE]; + ssize_t total_bytes_read = 0; + size_t last_bytes_read = 0; do{ //Reading file chunks - Status stat = file->PositionRead(input_buffer, sizeof(input_buffer), read_bytes_count, &last_read_bytes); - if(stat.ok()) { + status = file->Read(input_buffer, sizeof(input_buffer), &last_bytes_read); + if(status.ok()) { //Writing file chunks to stdout - fwrite(input_buffer, last_read_bytes, 1, stdout); - read_bytes_count += last_read_bytes; + fwrite(input_buffer, last_bytes_read, 1, stdout); + total_bytes_read += last_bytes_read; } else { - if(stat.is_invalid_offset()){ + if(status.is_invalid_offset()){ //Reached the end of the file break; } else { - cerr << "Error reading the file: " << stat.ToString() << endl; - return 1; + std::cerr << "Error reading the file: " << status.ToString() << std::endl; + exit(EXIT_FAILURE); } } - } while (last_read_bytes > 0); + } while (last_bytes_read > 0); // Clean up static data and prevent valgrind memory leaks google::protobuf::ShutdownProtobufLibrary(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/CMakeLists.txt new file mode 100644 index 0000000..7ae27c5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/CMakeLists.txt @@ -0,0 +1,35 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Default LIBHDFSPP_DIR to the default install location. You can override +# it by add -DLIBHDFSPP_DIR=... to your cmake invocation +set(LIBHDFSPP_DIR CACHE STRING ${CMAKE_INSTALL_PREFIX}) + +include_directories( ${LIBHDFSPP_DIR}/include ) +link_directories( ${LIBHDFSPP_DIR}/lib ) + +add_executable(find_cpp find.cpp) +target_link_libraries(find_cpp hdfspp) + +# Several examples in different languages need to produce executables with +# same names. To allow executables with same names we keep their CMake +# names different, but specify their executable names as follows: +set_target_properties( find_cpp + PROPERTIES + OUTPUT_NAME "find" +) http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/find.cpp ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/find.cpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/find.cpp new file mode 100644 index 0000000..21a731b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/find/find.cpp @@ -0,0 +1,162 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +/** + * A parallel find tool example. + * + * Finds all files matching the specified name recursively starting from the + * specified directory and prints their filepaths. Works either synchronously + * or asynchronously. + * + * Usage: find /<path-to-file> <file-name> <use_async> + * + * Example: find /dir?/tree* some?file*name 1 + * + * @param path-to-file Absolute path at which to begin search, can have wild + * cards and must be non-blank + * @param file-name Name to find, can have wild cards and must be non-blank + * @param use_async If set to 1 it prints out results asynchronously as + * they arrive. If set to 0 results are printed in one + * big chunk when it becomes available. + * + **/ + +#include "hdfspp/hdfspp.h" +#include "common/hdfs_configuration.h" +#include "common/configuration_loader.h" + +#include <google/protobuf/stubs/common.h> +#include <future> + +void SyncFind(std::shared_ptr<hdfs::FileSystem> fs, const std::string &path, const std::string &name){ + std::vector<hdfs::StatInfo> results; + //Synchronous call to Find + hdfs::Status stat = fs->Find(path, name, hdfs::FileSystem::GetDefaultFindMaxDepth(), &results); + + if (!stat.ok()) { + std::cerr << "Error: " << stat.ToString() << std::endl; + } + + if(results.empty()){ + std::cout << "Nothing Found" << std::endl; + } else { + //Printing out the results + for (hdfs::StatInfo const& si : results) { + std::cout << si.full_path << std::endl; + } + } +} + +void AsyncFind(std::shared_ptr<hdfs::FileSystem> fs, const std::string &path, const std::string &name){ + std::promise<void> promise; + std::future<void> future(promise.get_future()); + bool something_found = false; + hdfs::Status status = hdfs::Status::OK(); + + /** + * Keep requesting more until we get the entire listing. Set the promise + * when we have the entire listing to stop. + * + * Find guarantees that the handler will only be called once at a time, + * so we do not need any locking here + */ + auto handler = [&promise, &status, &something_found] + (const hdfs::Status &s, const std::vector<hdfs::StatInfo> & si, bool has_more_results) -> bool { + //Print result chunks as they arrive + if(!si.empty()) { + something_found = true; + for (hdfs::StatInfo const& s : si) { + std::cout << s.full_path << std::endl; + } + } + if(!s.ok() && status.ok()){ + //We make sure we set 'status' only on the first error. + status = s; + } + if (!has_more_results) { + promise.set_value(); //set promise + return false; //request stop sending results + } + return true; //request more results + }; + + //Asynchronous call to Find + fs->Find(path, name, hdfs::FileSystem::GetDefaultFindMaxDepth(), handler); + + //block until promise is set + future.get(); + if(!status.ok()) { + std::cerr << "Error: " << status.ToString() << std::endl; + } + if(!something_found){ + std::cout << "Nothing Found" << std::endl; + } +} + +int main(int argc, char *argv[]) { + if (argc != 4) { + std::cerr << "usage: find /<path-to-file> <file-name> <use_async>" << std::endl; + exit(EXIT_FAILURE); + } + + std::string path = argv[1]; + std::string name = argv[2]; + bool use_async = (std::stoi(argv[3]) != 0); + + hdfs::Options options; + //Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf" + hdfs::ConfigurationLoader loader; + //Loading default config files core-site.xml and hdfs-site.xml from the config path + hdfs::optional<hdfs::HdfsConfiguration> config = loader.LoadDefaultResources<hdfs::HdfsConfiguration>(); + //TODO: HDFS-9539 - after this is resolved, valid config will always be returned. + if(config){ + //Loading options from the config + options = config->GetOptions(); + } + //TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish + options.rpc_timeout = std::numeric_limits<int>::max(); + hdfs::IoService * io_service = hdfs::IoService::New(); + //Wrapping fs into a unique pointer to guarantee deletion + std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options)); + if (!fs) { + std::cerr << "Could not connect the file system." << std::endl; + exit(EXIT_FAILURE); + } + hdfs::Status status = fs->ConnectToDefaultFs(); + if (!status.ok()) { + if(!options.defaultFS.get_host().empty()){ + std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl; + } else { + std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl; + } + exit(EXIT_FAILURE); + } + + if (use_async){ + //Example of Async find + AsyncFind(fs, path, name); + } else { + //Example of Sync find + SyncFind(fs, path, name); + } + + // Clean up static data and prevent valgrind memory leaks + google::protobuf::ShutdownProtobufLibrary(); + return 0; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/gendirs/gendirs.cpp ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/gendirs/gendirs.cpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/gendirs/gendirs.cpp index 9f5ae5a..c90abbd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/gendirs/gendirs.cpp +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/examples/cpp/gendirs/gendirs.cpp @@ -23,9 +23,9 @@ * Generates a directory tree with specified depth and fanout starting from * a given path. Generation is asynchronous. * - * Usage: gendirs [hdfs://[<hostname>:<port>]]/<path-to-dir> <depth> <fanout> + * Usage: gendirs /<path-to-dir> <depth> <fanout> * - * Example: gendirs hdfs://localhost.localdomain:9433/dir0 3 10 + * Example: gendirs /dir0 3 10 * * @param path-to-dir Absolute path to the directory tree root where the * directory tree will be generated @@ -37,99 +37,76 @@ **/ #include "hdfspp/hdfspp.h" -#include "fs/namenode_operations.h" #include "common/hdfs_configuration.h" #include "common/configuration_loader.h" -#include "common/uri.h" -#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/stubs/common.h> #include <future> -using namespace std; -using namespace hdfs; +#define DEFAULT_PERMISSIONS 0755 -#define SCHEME "hdfs" - -void GenerateDirectories (shared_ptr<FileSystem> fs, int depth, int level, int fanout, string path, vector<future<Status>> & futures) { +void GenerateDirectories (std::shared_ptr<hdfs::FileSystem> fs, int depth, int level, int fanout, std::string path, std::vector<std::future<hdfs::Status>> & futures) { //Level contains our current depth in the directory tree if(level < depth) { for(int i = 0; i < fanout; i++){ //Recursive calls to cover all possible paths from the root to the leave nodes - GenerateDirectories(fs, depth, level+1, fanout, path + "dir" + to_string(i) + "/", futures); + GenerateDirectories(fs, depth, level+1, fanout, path + "dir" + std::to_string(i) + "/", futures); } } else { //We have reached the leaf nodes and now start making calls to create directories //We make a promise which will be set when the call finishes and executes our handler - auto callstate = make_shared<promise<Status>>(); + auto callstate = std::make_shared<std::promise<hdfs::Status>>(); //Extract a future from this promise - future<Status> future(callstate->get_future()); + std::future<hdfs::Status> future(callstate->get_future()); //Save this future to the vector of futures which will be used to wait on all promises //after the whole recursion is done - futures.push_back(move(future)); + futures.push_back(std::move(future)); //Create a handler that will be executed when Mkdirs is done - auto handler = [callstate](const Status &s) { + auto handler = [callstate](const hdfs::Status &s) { callstate->set_value(s); }; //Asynchronous call to create this directory along with all missing parent directories - fs->Mkdirs(path, NameNodeOperations::GetDefaultPermissionMask(), true, handler); + fs->Mkdirs(path, DEFAULT_PERMISSIONS, true, handler); } } int main(int argc, char *argv[]) { if (argc != 4) { - cerr << "usage: gendirs [hdfs://[<hostname>:<port>]]/<path-to-dir> <depth> <fanout>" << endl; - return 1; + std::cerr << "usage: gendirs /<path-to-dir> <depth> <fanout>" << std::endl; + exit(EXIT_FAILURE); } - optional<URI> uri; - const string uri_path = argv[1]; - const int depth = stoi(argv[2]); - const int fanout = stoi(argv[3]); + std::string path = argv[1]; + int depth = std::stoi(argv[2]); + int fanout = std::stoi(argv[3]); - //Separate check for scheme is required, otherwise common/uri.h library causes memory issues under valgrind - size_t scheme_end = uri_path.find("://"); - if (scheme_end != string::npos) { - if(uri_path.substr(0, string(SCHEME).size()).compare(SCHEME) != 0) { - cerr << "Scheme " << uri_path.substr(0, scheme_end) << ":// is not supported" << endl; - return 1; - } else { - uri = URI::parse_from_string(uri_path); - } - } - if (!uri) { - cerr << "Malformed URI: " << uri_path << endl; - return 1; - } - - ConfigurationLoader loader; - optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>(); - const char * envHadoopConfDir = getenv("HADOOP_CONF_DIR"); - if (envHadoopConfDir && (*envHadoopConfDir != 0) ) { - config = loader.OverlayResourceFile(*config, string(envHadoopConfDir) + "/core-site.xml"); - } - - Options options; - options.rpc_timeout = numeric_limits<int>::max(); + hdfs::Options options; + //Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf" + hdfs::ConfigurationLoader loader; + //Loading default config files core-site.xml and hdfs-site.xml from the config path + hdfs::optional<hdfs::HdfsConfiguration> config = loader.LoadDefaultResources<hdfs::HdfsConfiguration>(); + //TODO: HDFS-9539 - after this is resolved, valid config will always be returned. if(config){ + //Loading options from the config options = config->GetOptions(); } - - IoService * io_service = IoService::New(); - - FileSystem *fs_raw = FileSystem::New(io_service, "", options); - if (!fs_raw) { - cerr << "Could not create FileSystem object" << endl; - return 1; + //TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish + options.rpc_timeout = std::numeric_limits<int>::max(); + hdfs::IoService * io_service = hdfs::IoService::New(); + //Wrapping fs into a unique pointer to guarantee deletion + std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options)); + if (!fs) { + std::cerr << "Could not connect the file system." << std::endl; + exit(EXIT_FAILURE); } - //Wrapping fs_raw into a unique pointer to guarantee deletion - shared_ptr<FileSystem> fs(fs_raw); - - //Get port from the uri, otherwise use the default port - string port = to_string(uri->get_port().value_or(8020)); - Status stat = fs->Connect(uri->get_host(), port); - if (!stat.ok()) { - cerr << "Could not connect to " << uri->get_host() << ":" << port << endl; - return 1; + hdfs::Status status = fs->ConnectToDefaultFs(); + if (!status.ok()) { + if(!options.defaultFS.get_host().empty()){ + std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl; + } else { + std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl; + } + exit(EXIT_FAILURE); } /** @@ -143,22 +120,23 @@ int main(int argc, char *argv[]) { * processed. After the whole recursion is complete we will need to wait until * all promises are set before we can exit. **/ - vector<future<Status>> futures; + std::vector<std::future<hdfs::Status>> futures; - GenerateDirectories(fs, depth, 0, fanout, uri->get_path() + "/", futures); + GenerateDirectories(fs, depth, 0, fanout, path + "/", futures); /** * We are waiting here until all promises are set, and checking whether * the returned statuses contained any errors. **/ - for(future<Status> &fs : futures){ - Status stat = fs.get(); - if (!stat.ok()) { - cerr << "Error: " << stat.ToString() << endl; + for(std::future<hdfs::Status> &fs : futures){ + hdfs::Status status = fs.get(); + if (!status.ok()) { + std::cerr << "Error: " << status.ToString() << std::endl; + exit(EXIT_FAILURE); } } - cout << "All done!" << endl; + std::cout << "All done!" << std::endl; // Clean up static data and prevent valgrind memory leaks google::protobuf::ShutdownProtobufLibrary(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h index ce9f0f5..b41857c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h @@ -287,6 +287,20 @@ LIBHDFS_EXTERNAL int hdfsPreAttachFileMonitor(libhdfspp_file_event_callback handler, int64_t cookie); +/** + * Finds file name on the file system. hdfsFreeFileInfo should be called to deallocate memory. + * + * @param fs The filesystem (required) + * @param path Path at which to begin search, can have wild cards (must be non-blank) + * @param name Name to find, can have wild cards (must be non-blank) + * @param numEntries Set to the number of files/directories in the result. + * @return Returns a dynamically-allocated array of hdfsFileInfo + * objects; NULL on error or empty result. + * errno is set to non-zero on error or zero on success. + **/ +hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries); + + /***************************************************************************** * HDFS SNAPSHOT FUNCTIONS ****************************************************************************/ http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h index 46fe8e9..4b88fe5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h @@ -138,6 +138,18 @@ public: **/ class FileSystem { public: + //Returns the default maximum depth for recursive Find tool + static uint32_t GetDefaultFindMaxDepth(); + + //Returns the default permission mask + static uint16_t GetDefaultPermissionMask(); + + //Checks if the given permission mask is valid + static Status CheckValidPermissionMask(uint16_t permissions); + + //Checks if replication value is valid + static Status CheckValidReplication(uint16_t replication); + /** * Create a new instance of the FileSystem object. The call * initializes the RPC connections to the NameNode and returns an @@ -236,7 +248,7 @@ class FileSystem { * * The asynchronous method will return batches of files; the consumer must * return true if they want more files to be delivered. The final bool - * parameter in the callback will be set to true if this is the final + * parameter in the callback will be set to false if this is the final * batch of files. * * The synchronous method will return all files in the directory. @@ -245,9 +257,8 @@ class FileSystem { **/ virtual void GetListing(const std::string &path, - const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> &handler) = 0; - virtual Status GetListing(const std::string &path, - std::shared_ptr<std::vector<StatInfo>> & stat_infos) = 0; + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) = 0; + virtual Status GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) = 0; /** * Returns the locations of all known blocks for the indicated file (or part of it), or an error @@ -297,8 +308,8 @@ class FileSystem { * @param path the path to the file or directory * @param permissions the bitmask to set it to (should be between 0 and 01777) */ - virtual void SetPermission(const std::string & path, - uint16_t permissions, const std::function<void(const Status &)> &handler) = 0; + virtual void SetPermission(const std::string & path, uint16_t permissions, + const std::function<void(const Status &)> &handler) = 0; virtual Status SetPermission(const std::string & path, uint16_t permissions) = 0; /** @@ -307,12 +318,34 @@ class FileSystem { * @param path file path * @param username If it is empty, the original username remains unchanged. * @param groupname If it is empty, the original groupname remains unchanged. + * @param recursive If true, the change will be propagated recursively. */ virtual void SetOwner(const std::string & path, const std::string & username, const std::string & groupname, const std::function<void(const Status &)> &handler) = 0; virtual Status SetOwner(const std::string & path, const std::string & username, const std::string & groupname) = 0; + /** + * Finds all files matching the specified name recursively starting from the + * specified directory. Returns metadata for each of them. + * + * Example: Find("/dir?/tree*", "some?file*name") + * + * @param path Absolute path at which to begin search, can have wild cards (must be non-blank) + * @param name Name to find, can also have wild cards (must be non-blank) + * + * The asynchronous method will return batches of files; the consumer must + * return true if they want more files to be delivered. The final bool + * parameter in the callback will be set to false if this is the final + * batch of files. + * + * The synchronous method will return matching files. + **/ + virtual void + Find(const std::string &path, const std::string &name, const uint32_t maxdepth, + const std::function<bool(const Status &, const std::vector<StatInfo> & , bool)> &handler) = 0; + virtual Status Find(const std::string &path, const std::string &name, + const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) = 0; /***************************************************************************** http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h index a53ab8b..e077dda 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/statinfo.h @@ -33,6 +33,7 @@ struct StatInfo { int file_type; ::std::string path; + ::std::string full_path; unsigned long int length; unsigned long int permissions; //Octal number as in POSIX permissions; e.g. 0777 ::std::string owner; http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index 4003358..a43d94f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -731,22 +731,21 @@ hdfsFileInfo *hdfsListDirectory(hdfsFS fs, const char* path, int *numEntries) { if(!abs_path) { return nullptr; } - std::shared_ptr<std::vector<StatInfo>> stat_infos; - Status stat = fs->get_impl()->GetListing(*abs_path, stat_infos); + std::vector<StatInfo> stat_infos; + Status stat = fs->get_impl()->GetListing(*abs_path, &stat_infos); if (!stat.ok()) { Error(stat); *numEntries = 0; return nullptr; } - //Existing API expects nullptr if size is 0 - if(!stat_infos || stat_infos->size()==0){ + if(stat_infos.empty()){ *numEntries = 0; return nullptr; } - *numEntries = stat_infos->size(); - hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos->size()]; - for(std::vector<StatInfo>::size_type i = 0; i < stat_infos->size(); i++) { - StatInfoToHdfsFileInfo(&file_infos[i], stat_infos->at(i)); + *numEntries = stat_infos.size(); + hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos.size()]; + for(std::vector<StatInfo>::size_type i = 0; i < stat_infos.size(); i++) { + StatInfoToHdfsFileInfo(&file_infos[i], stat_infos.at(i)); } return file_infos; @@ -785,7 +784,7 @@ int hdfsCreateDirectory(hdfsFS fs, const char* path) { } Status stat; //Use default permissions and set true for creating all non-existant parent directories - stat = fs->get_impl()->Mkdirs(*abs_path, NameNodeOperations::GetDefaultPermissionMask(), true); + stat = fs->get_impl()->Mkdirs(*abs_path, FileSystem::GetDefaultPermissionMask(), true); if (!stat.ok()) { return Error(stat); } @@ -854,7 +853,7 @@ int hdfsChmod(hdfsFS fs, const char* path, short mode){ if(!abs_path) { return -1; } - Status stat = NameNodeOperations::CheckValidPermissionMask(mode); + Status stat = FileSystem::CheckValidPermissionMask(mode); if (!stat.ok()) { return Error(stat); } @@ -896,6 +895,44 @@ int hdfsChown(hdfsFS fs, const char* path, const char *owner, const char *group) } } +hdfsFileInfo * hdfsFind(hdfsFS fs, const char* path, const char* name, uint32_t * numEntries){ + try { + errno = 0; + if (!CheckSystem(fs)) { + *numEntries = 0; + return nullptr; + } + + std::vector<StatInfo> stat_infos; + Status stat = fs->get_impl()->Find(path, name, hdfs::FileSystem::GetDefaultFindMaxDepth(), &stat_infos); + if (!stat.ok()) { + Error(stat); + *numEntries = 0; + return nullptr; + } + //Existing API expects nullptr if size is 0 + if(stat_infos.empty()){ + *numEntries = 0; + return nullptr; + } + *numEntries = stat_infos.size(); + hdfsFileInfo *file_infos = new hdfsFileInfo[stat_infos.size()]; + for(std::vector<StatInfo>::size_type i = 0; i < stat_infos.size(); i++) { + StatInfoToHdfsFileInfo(&file_infos[i], stat_infos.at(i)); + } + + return file_infos; + } catch (const std::exception & e) { + ReportException(e); + *numEntries = 0; + return nullptr; + } catch (...) { + ReportCaughtNonException(); + *numEntries = 0; + return nullptr; + } +} + int hdfsCreateSnapshot(hdfsFS fs, const char* path, const char* name) { try { errno = 0; @@ -1373,19 +1410,18 @@ HdfsConfiguration LoadDefault(ConfigurationLoader & loader) } else { - return loader.New<HdfsConfiguration>(); + return loader.NewConfig<HdfsConfiguration>(); } } -hdfsBuilder::hdfsBuilder() : config(loader.New<HdfsConfiguration>()) +hdfsBuilder::hdfsBuilder() : config(loader.NewConfig<HdfsConfiguration>()) { errno = 0; - loader.SetDefaultSearchPath(); config = LoadDefault(loader); } hdfsBuilder::hdfsBuilder(const char * directory) : - config(loader.New<HdfsConfiguration>()) + config(loader.NewConfig<HdfsConfiguration>()) { errno = 0; loader.SetSearchPath(directory); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.cc index 1eb70c3..e1434dad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.cc @@ -62,6 +62,12 @@ bool str_to_bool(const std::string& raw) { return false; } +ConfigurationLoader::ConfigurationLoader() { + //In order to creates a configuration loader with the default search path + //("$HADOOP_CONF_DIR" or "/etc/hadoop/conf") we call SetDefaultSearchPath(). + ConfigurationLoader::SetDefaultSearchPath(); +} + void ConfigurationLoader::SetDefaultSearchPath() { // Try (in order, taking the first valid one): // $HADOOP_CONF_DIR @@ -257,4 +263,4 @@ bool ConfigurationLoader::UpdateMapWithValue(ConfigMap& map, return true; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.h index 059e48b..51ac23a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader.h @@ -27,9 +27,9 @@ namespace hdfs { class ConfigurationLoader { public: // Creates a new, empty Configuration object - // T must be Configuration or a subclass + // T must be Configuration or a subclass template<class T> - T New(); + T NewConfig(); /**************************************************************************** * LOADING CONFIG FILES @@ -79,6 +79,10 @@ public: * SEARCH PATH METHODS ***************************************************************************/ + //Creates a configuration loader with the default search path ("$HADOOP_CONF_DIR" or "/etc/hadoop/conf"). + //If you want to explicitly set the entire search path, call ClearSearchPath() first + ConfigurationLoader(); + // Sets the search path to the default search path (namely, "$HADOOP_CONF_DIR" or "/etc/hadoop/conf") void SetDefaultSearchPath(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader_impl.h index 9e18878..6258450 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader_impl.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/configuration_loader_impl.h @@ -23,7 +23,7 @@ namespace hdfs { template<class T> -T ConfigurationLoader::New() { +T ConfigurationLoader::NewConfig() { return T(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index de6ebb7..d75939f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -26,6 +26,7 @@ #include <tuple> #include <iostream> #include <pwd.h> +#include <fnmatch.h> #define FMT_THIS_ADDR "this=" << (void*)this @@ -39,6 +40,34 @@ using ::asio::ip::tcp; static constexpr uint16_t kDefaultPort = 8020; +uint32_t FileSystem::GetDefaultFindMaxDepth() { + return std::numeric_limits<uint32_t>::max(); +} + +uint16_t FileSystem::GetDefaultPermissionMask() { + return 0755; +} + +Status FileSystem::CheckValidPermissionMask(uint16_t permissions) { + if (permissions > 01777) { + std::stringstream errormsg; + errormsg << "CheckValidPermissionMask: argument 'permissions' is " << std::oct + << std::showbase << permissions << " (should be between 0 and 01777)"; + return Status::InvalidArgument(errormsg.str().c_str()); + } + return Status::OK(); +} + +Status FileSystem::CheckValidReplication(uint16_t replication) { + if (replication < 1 || replication > 512) { + std::stringstream errormsg; + errormsg << "CheckValidReplication: argument 'replication' is " + << replication << " (should be between 1 and 512)"; + return Status::InvalidArgument(errormsg.str().c_str()); + } + return Status::OK(); +} + /***************************************************************************** * FILESYSTEM BASE CLASS ****************************************************************************/ @@ -446,7 +475,7 @@ void FileSystemImpl::SetReplication(const std::string & path, int16_t replicatio handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty")); return; } - Status replStatus = NameNodeOperations::CheckValidReplication(replication); + Status replStatus = FileSystem::CheckValidReplication(replication); if (!replStatus.ok()) { handler(replStatus); return; @@ -593,44 +622,43 @@ Status FileSystemImpl::GetFsStats(FsInfo & fs_info) { * Some compilers don't like recursive lambdas, so we make the lambda call a * method, which in turn creates a lambda calling itself. */ -void FileSystemImpl::GetListingShim(const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more, - std::string path, - const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> &handler) { - bool has_next = stat_infos && stat_infos->size() > 0; +void FileSystemImpl::GetListingShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more, + std::string path, const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) { + bool has_next = !stat_infos.empty(); bool get_more = handler(stat, stat_infos, has_more && has_next); if (get_more && has_more && has_next ) { - auto callback = [this, path, handler](const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more) { + auto callback = [this, path, handler](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) { GetListingShim(stat, stat_infos, has_more, path, handler); }; - std::string last = stat_infos->back().path; + std::string last = stat_infos.back().path; nn_.GetListing(path, callback, last); } } void FileSystemImpl::GetListing( const std::string &path, - const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> &handler) { + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) { LOG_INFO(kFileSystem, << "FileSystemImpl::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called"); // Caputure the state and push it into the shim - auto callback = [this, path, handler](const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more) { + auto callback = [this, path, handler](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) { GetListingShim(stat, stat_infos, has_more, path, handler); }; nn_.GetListing(path, callback); } -Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr<std::vector<StatInfo>> &stat_infos) { +Status FileSystemImpl::GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) { LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called"); - // In this case, we're going to allocate the result on the heap and have the - // async code populate it. - auto results = std::make_shared<std::vector<StatInfo>>(); + if (!stat_infos) { + return Status::InvalidArgument("FileSystemImpl::GetListing: argument 'stat_infos' cannot be NULL"); + } auto callstate = std::make_shared<std::promise<Status>>(); std::future<Status> future(callstate->get_future()); @@ -640,9 +668,9 @@ Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr<std:: Keep requesting more until we get the entire listing, and don't set the promise * until we have the entire listing. */ - auto h = [callstate, results](const Status &s, std::shared_ptr<std::vector<StatInfo>> si, bool has_more) -> bool { - if (si) { - results->insert(results->end(), si->begin(), si->end()); + auto h = [callstate, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more) -> bool { + if (!si.empty()) { + stat_infos->insert(stat_infos->end(), si.begin(), si.end()); } bool done = !s.ok() || !has_more; @@ -658,11 +686,6 @@ Status FileSystemImpl::GetListing(const std::string &path, std::shared_ptr<std:: /* block until promise is set */ Status stat = future.get(); - if (!stat.ok()) { - return stat; - } - - stat_infos = results; return stat; } @@ -677,7 +700,7 @@ void FileSystemImpl::Mkdirs(const std::string & path, uint16_t permissions, bool return; } - Status permStatus = NameNodeOperations::CheckValidPermissionMask(permissions); + Status permStatus = FileSystem::CheckValidPermissionMask(permissions); if (!permStatus.ok()) { handler(permStatus); return; @@ -790,7 +813,7 @@ void FileSystemImpl::SetPermission(const std::string & path, handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty")); return; } - Status permStatus = NameNodeOperations::CheckValidPermissionMask(permissions); + Status permStatus = FileSystem::CheckValidPermissionMask(permissions); if (!permStatus.ok()) { handler(permStatus); return; @@ -832,8 +855,8 @@ void FileSystemImpl::SetOwner(const std::string & path, const std::string & user nn_.SetOwner(path, username, groupname, handler); } -Status FileSystemImpl::SetOwner(const std::string & path, - const std::string & username, const std::string & groupname) { +Status FileSystemImpl::SetOwner(const std::string & path, const std::string & username, + const std::string & groupname) { LOG_DEBUG(kFileSystem, << "FileSystemImpl::[sync]SetOwner(" << FMT_THIS_ADDR << ", path=" << path << ", username=" << username << ", groupname=" << groupname << ") called"); @@ -849,10 +872,179 @@ Status FileSystemImpl::SetOwner(const std::string & path, /* block until promise is set */ Status stat = future.get(); - return stat; } +/** + * Helper function for recursive Find calls. + * + * Some compilers don't like recursive lambdas, so we make the lambda call a + * method, which in turn creates a lambda calling itself. + * + * ***High-level explanation*** + * + * Since we are allowing to use wild cards in both path and name, we start by expanding the path first. + * Boolean search_path is set to true when we search for the path and false when we search for the name. + * When we search for the path we break the given path pattern into sub-directories. Starting from the + * first sub-directory we list them one-by-one and recursively continue into directories that matched the + * path pattern at the current depth. Directories that are large will be requested to continue sending + * the results. We keep track of the current depth within the path pattern in the 'depth' variable. + * This continues recursively until the depth reaches the end of the path. Next that we start matching + * the name pattern. All directories that we find we recurse now, and all names that match the given name + * pattern are being stored in outputs and later sent back to the user. + */ +void FileSystemImpl::FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool directory_has_more, + std::shared_ptr<FindOperationalState> operational_state, std::shared_ptr<FindSharedState> shared_state) { + //We buffer the outputs then send them back at the end + std::vector<StatInfo> outputs; + //Return on error + if(!stat.ok()){ + std::lock_guard<std::mutex> find_lock(shared_state->lock); + //We send true becuase we do not want the user code to exit before all our requests finished + shared_state->handler(stat, outputs, true); + shared_state->aborted = true; + } + if(!shared_state->aborted){ + //User did not abort the operation + if (directory_has_more) { + //Directory is large and has more results + //We launch another async call to get more results + shared_state->outstanding_requests++; + auto callback = [this, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) { + FindShim(stat, stat_infos, has_more, operational_state, shared_state); + }; + std::string last = stat_infos.back().path; + nn_.GetListing(operational_state->path, callback, last); + } + if(operational_state->search_path && operational_state->depth < shared_state->dirs.size() - 1){ + //We are searching for the path and did not reach the end of the path yet + for (StatInfo const& si : stat_infos) { + //If we are at the last depth and it matches both path and name, we need to output it. + if (operational_state->depth == shared_state->dirs.size() - 2 + && !fnmatch(shared_state->dirs[operational_state->depth + 1].c_str(), si.path.c_str(), 0) + && !fnmatch(shared_state->name.c_str(), si.path.c_str(), 0)) { + outputs.push_back(si); + } + //Skip if not directory + if(si.file_type != StatInfo::IS_DIR) { + continue; + } + //Checking for a match with the path at the current depth + if(!fnmatch(shared_state->dirs[operational_state->depth + 1].c_str(), si.path.c_str(), 0)){ + //Launch a new requests for every matched directory + shared_state->outstanding_requests++; + auto callback = [this, si, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) { + std::shared_ptr<FindOperationalState> new_current_state = std::make_shared<FindOperationalState>(si.full_path, operational_state->depth + 1, true); //true because searching for the path + FindShim(stat, stat_infos, has_more, new_current_state, shared_state); + }; + nn_.GetListing(si.full_path, callback); + } + } + } + else if(shared_state->maxdepth > operational_state->depth - shared_state->dirs.size() + 1){ + //We are searching for the name now and maxdepth has not been reached + for (StatInfo const& si : stat_infos) { + //Launch a new request for every directory + if(si.file_type == StatInfo::IS_DIR) { + shared_state->outstanding_requests++; + auto callback = [this, si, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more) { + std::shared_ptr<FindOperationalState> new_current_state = std::make_shared<FindOperationalState>(si.full_path, operational_state->depth + 1, false); //false because searching for the name + FindShim(stat, stat_infos, has_more, new_current_state, shared_state); + }; + nn_.GetListing(si.full_path, callback); + } + //All names that match the specified name are saved to outputs + if(!fnmatch(shared_state->name.c_str(), si.path.c_str(), 0)){ + outputs.push_back(si); + } + } + } + } + //This section needs a lock to make sure we return the final chunk only once + //and no results are sent after aborted is set + std::lock_guard<std::mutex> find_lock(shared_state->lock); + //Decrement the counter once since we are done with this chunk + shared_state->outstanding_requests--; + if(shared_state->outstanding_requests == 0){ + //Send the outputs back to the user and notify that this is the final chunk + shared_state->handler(stat, outputs, false); + } else { + //There will be more results and we are not aborting + if (outputs.size() > 0 && !shared_state->aborted){ + //Send the outputs back to the user and notify that there is more + bool user_wants_more = shared_state->handler(stat, outputs, true); + if(!user_wants_more) { + //Abort if user doesn't want more + shared_state->aborted = true; + } + } + } +} + +void FileSystemImpl::Find( + const std::string &path, const std::string &name, const uint32_t maxdepth, + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) { + LOG_INFO(kFileSystem, << "FileSystemImpl::Find(" + << FMT_THIS_ADDR << ", path=" + << path << ", name=" + << name << ") called"); + + //Populating the operational state, which includes: + //current search path, depth within the path, and the indication that we are currently searching for a path (not name yet). + std::shared_ptr<FindOperationalState> operational_state = std::make_shared<FindOperationalState>(path, 0, true); + //Populating the shared state, which includes: + //vector of sub-directories constructed from path, name to search, handler to use for result returning, outstanding_requests counter, and aborted flag. + std::shared_ptr<FindSharedState> shared_state = std::make_shared<FindSharedState>(path, name, maxdepth, handler, 1, false); + auto callback = [this, operational_state, shared_state](const Status &stat, const std::vector<StatInfo> & stat_infos, bool directory_has_more) { + FindShim(stat, stat_infos, directory_has_more, operational_state, shared_state); + }; + nn_.GetListing("/", callback); +} + +Status FileSystemImpl::Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) { + LOG_INFO(kFileSystem, << "FileSystemImpl::[sync]Find(" + << FMT_THIS_ADDR << ", path=" + << path << ", name=" + << name << ") called"); + + if (!stat_infos) { + return Status::InvalidArgument("FileSystemImpl::Find: argument 'stat_infos' cannot be NULL"); + } + + // In this case, we're going to have the async code populate stat_infos. + + std::promise<void> promise = std::promise<void>(); + std::future<void> future(promise.get_future()); + Status status = Status::OK(); + + /** + * Keep requesting more until we get the entire listing. Set the promise + * when we have the entire listing to stop. + * + * Find guarantees that the handler will only be called once at a time, + * so we do not need any locking here + */ + auto h = [&status, &promise, stat_infos](const Status &s, const std::vector<StatInfo> & si, bool has_more_results) -> bool { + if (!si.empty()) { + stat_infos->insert(stat_infos->end(), si.begin(), si.end()); + } + if (!s.ok() && status.ok()){ + //We make sure we set 'status' only on the first error. + status = s; + } + if (!has_more_results) { + promise.set_value(); + return false; + } + return true; + }; + + Find(path, name, maxdepth, h); + + /* block until promise is set */ + future.get(); + return status; +} void FileSystemImpl::CreateSnapshot(const std::string &path, const std::string &name, http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index 75a43f3..0e9cedd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -94,9 +94,9 @@ public: void GetListing( const std::string &path, - const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> &handler) override; + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) override; - Status GetListing(const std::string &path, std::shared_ptr<std::vector<StatInfo>> &stat_infos) override; + Status GetListing(const std::string &path, std::vector<StatInfo> * stat_infos) override; virtual void GetBlockLocations(const std::string & path, uint64_t offset, uint64_t length, const std::function<void(const Status &, std::shared_ptr<FileBlockLocation> locations)> ) override; @@ -115,8 +115,8 @@ public: const std::function<void(const Status &)> &handler) override; virtual Status Rename(const std::string &oldPath, const std::string &newPath) override; - virtual void SetPermission(const std::string & path, - uint16_t permissions, const std::function<void(const Status &)> &handler) override; + virtual void SetPermission(const std::string & path, uint16_t permissions, + const std::function<void(const Status &)> &handler) override; virtual Status SetPermission(const std::string & path, uint16_t permissions) override; virtual void SetOwner(const std::string & path, const std::string & username, @@ -124,6 +124,11 @@ public: virtual Status SetOwner(const std::string & path, const std::string & username, const std::string & groupname) override; + void Find( + const std::string &path, const std::string &name, const uint32_t maxdepth, + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler) override; + Status Find(const std::string &path, const std::string &name, const uint32_t maxdepth, std::vector<StatInfo> * stat_infos) override; + /***************************************************************************** * FILE SYSTEM SNAPSHOT FUNCTIONS ****************************************************************************/ @@ -204,9 +209,58 @@ private: **/ std::shared_ptr<LibhdfsEvents> event_handlers_; - void GetListingShim(const Status &stat, std::shared_ptr<std::vector<StatInfo>> &stat_infos, bool has_more, - std::string path, - const std::function<bool(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> &handler); + void GetListingShim(const Status &stat, const std::vector<StatInfo> & stat_infos, bool has_more, + std::string path, const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> &handler); + + struct FindSharedState { + //Name pattern (can have wild-cards) to find + const std::string name; + //Maximum depth to recurse after the end of path is reached. + //Can be set to 0 for pure path globbing and ignoring name pattern entirely. + const uint32_t maxdepth; + //Vector of all sub-directories from the path argument (each can have wild-cards) + std::vector<std::string> dirs; + //Callback from Find + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler; + //outstanding_requests is incremented once for every GetListing call. + std::atomic<uint64_t> outstanding_requests; + //Boolean needed to abort all recursion on error or on user command + std::atomic<bool> aborted; + //Shared variables will need protection with a lock + std::mutex lock; + FindSharedState(const std::string path_, const std::string name_, const uint32_t maxdepth_, + const std::function<bool(const Status &, const std::vector<StatInfo> &, bool)> handler_, + uint64_t outstanding_recuests_, bool aborted_) + : name(name_), + maxdepth(maxdepth_), + handler(handler_), + outstanding_requests(outstanding_recuests_), + aborted(aborted_), + lock() { + //Constructing the list of sub-directories + std::stringstream ss(path_); + if(path_.back() != '/'){ + ss << "/"; + } + for (std::string token; std::getline(ss, token, '/'); ) { + dirs.push_back(token); + } + } + }; + + struct FindOperationalState { + const std::string path; + const uint32_t depth; + const bool search_path; + FindOperationalState(const std::string path_, const uint32_t depth_, const bool search_path_) + : path(path_), + depth(depth_), + search_path(search_path_) { + } + }; + + void FindShim(const Status &stat, const std::vector<StatInfo> & stat_infos, + bool directory_has_more, std::shared_ptr<FindOperationalState> current_state, std::shared_ptr<FindSharedState> shared_state); }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc index 27ccb5d..89acac3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.cc @@ -39,30 +39,6 @@ namespace hdfs { * NAMENODE OPERATIONS ****************************************************************************/ -uint16_t NameNodeOperations::GetDefaultPermissionMask() { - return 0755; -} - -Status NameNodeOperations::CheckValidPermissionMask(uint16_t permissions) { - if (permissions > 01777) { - std::stringstream errormsg; - errormsg << "CheckValidPermissionMask: argument 'permissions' is " << std::oct - << std::showbase << permissions << " (should be between 0 and 01777)"; - return Status::InvalidArgument(errormsg.str().c_str()); - } - return Status::OK(); -} - -Status NameNodeOperations::CheckValidReplication(uint16_t replication) { - if (replication < 1 || replication > 512) { - std::stringstream errormsg; - errormsg << "CheckValidReplication: argument 'replication' is " - << replication << " (should be between 1 and 512)"; - return Status::InvalidArgument(errormsg.str().c_str()); - } - return Status::OK(); -} - void NameNodeOperations::Connect(const std::string &cluster_name, const std::vector<ResolvedNamenodeInfo> &servers, std::function<void(const Status &)> &&handler) { @@ -170,7 +146,7 @@ void NameNodeOperations::SetReplication(const std::string & path, int16_t replic handler(Status::InvalidArgument("SetReplication: argument 'path' cannot be empty")); return; } - Status replStatus = CheckValidReplication(replication); + Status replStatus = FileSystemImpl::CheckValidReplication(replication); if (!replStatus.ok()) { handler(replStatus); return; @@ -252,7 +228,8 @@ void NameNodeOperations::GetFileInfo(const std::string & path, // no fs in the protobuf. if(resp -> has_fs()){ struct StatInfo stat_info; - stat_info.path=path; + stat_info.path = path; + stat_info.full_path = path; HdfsFileStatusProtoToStatInfo(stat_info, resp->fs()); handler(stat, stat_info); } else { @@ -290,7 +267,7 @@ void NameNodeOperations::GetFsStats( void NameNodeOperations::GetListing( const std::string & path, - std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>> &, bool)> handler, + std::function<void(const Status &, const std::vector<StatInfo> &, bool)> handler, const std::string & start_after) { using ::hadoop::hdfs::GetListingRequestProto; using ::hadoop::hdfs::GetListingResponseProto; @@ -300,8 +277,8 @@ void NameNodeOperations::GetListing( << "NameNodeOperations::GetListing(" << FMT_THIS_ADDR << ", path=" << path << ") called"); if (path.empty()) { - std::shared_ptr<std::vector<StatInfo>> stat_infos; - handler(Status::InvalidArgument("GetListing: argument 'path' cannot be empty"), stat_infos, false); + std::vector<StatInfo> empty; + handler(Status::InvalidArgument("GetListing: argument 'path' cannot be empty"), empty, false); return; } @@ -312,31 +289,26 @@ void NameNodeOperations::GetListing( auto resp = std::make_shared<GetListingResponseProto>(); - namenode_.GetListing( - &req, - resp, - [resp, handler, path](const Status &stat) { - if (stat.ok()) { - if(resp -> has_dirlist()){ - std::shared_ptr<std::vector<StatInfo>> stat_infos(new std::vector<StatInfo>); - for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) { - StatInfo si; - si.path=fs.path(); - HdfsFileStatusProtoToStatInfo(si, fs); - stat_infos->push_back(si); - } - handler(stat, stat_infos, resp->dirlist().remainingentries() > 0); - } else { - std::string errormsg = "No such file or directory: " + path; - Status statNew = Status::PathNotFound(errormsg.c_str()); - std::shared_ptr<std::vector<StatInfo>> stat_infos; - handler(statNew, stat_infos, false); - } - } else { - std::shared_ptr<std::vector<StatInfo>> stat_infos; - handler(stat, stat_infos, false); + namenode_.GetListing(&req, resp, [resp, handler, path](const Status &stat) { + std::vector<StatInfo> stat_infos; + if (stat.ok()) { + if(resp -> has_dirlist()){ + for (::hadoop::hdfs::HdfsFileStatusProto const& fs : resp->dirlist().partiallisting()) { + StatInfo si; + si.path = fs.path(); + si.full_path = path + fs.path() + "/"; + HdfsFileStatusProtoToStatInfo(si, fs); + stat_infos.push_back(si); } - }); + handler(stat, stat_infos, resp->dirlist().remainingentries() > 0); + } else { + std::string errormsg = "No such file or directory: " + path; + handler(Status::PathNotFound(errormsg.c_str()), stat_infos, false); + } + } else { + handler(stat, stat_infos, false); + } + }); } void NameNodeOperations::Mkdirs(const std::string & path, uint16_t permissions, bool createparent, @@ -355,7 +327,7 @@ void NameNodeOperations::Mkdirs(const std::string & path, uint16_t permissions, } MkdirsRequestProto req; - Status permStatus = CheckValidPermissionMask(permissions); + Status permStatus = FileSystemImpl::CheckValidPermissionMask(permissions); if (!permStatus.ok()) { handler(permStatus); return; @@ -471,7 +443,7 @@ void NameNodeOperations::SetPermission(const std::string & path, handler(Status::InvalidArgument("SetPermission: argument 'path' cannot be empty")); return; } - Status permStatus = CheckValidPermissionMask(permissions); + Status permStatus = FileSystemImpl::CheckValidPermissionMask(permissions); if (!permStatus.ok()) { handler(permStatus); return; http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h index 3afa2e9..60efacc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/namenode_operations.h @@ -48,12 +48,6 @@ public: engine_(io_service, options, client_name, user_name, protocol_name, protocol_version), namenode_(& engine_), options_(options) {} - static uint16_t GetDefaultPermissionMask(); - - static Status CheckValidPermissionMask(uint16_t permissions); - - static Status CheckValidReplication(uint16_t replication); - void Connect(const std::string &cluster_name, const std::vector<ResolvedNamenodeInfo> &servers, std::function<void(const Status &)> &&handler); @@ -77,7 +71,7 @@ public: // start_after="" for initial call void GetListing(const std::string & path, - std::function<void(const Status &, std::shared_ptr<std::vector<StatInfo>>&, bool)> handler, + std::function<void(const Status &, const std::vector<StatInfo> &, bool)> handler, const std::string & start_after = ""); void Mkdirs(const std::string & path, uint16_t permissions, bool createparent, http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.cc index 4d49728..9534204 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.cc @@ -33,21 +33,27 @@ TEST(ConfigurationTest, TestDegenerateInputs) { /* Completely empty stream */ { std::stringstream stream; - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(""); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(""); EXPECT_FALSE(config && "Empty stream"); } /* No values */ { std::string data = "<configuration></configuration>"; - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(data); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(data); EXPECT_TRUE(config && "Blank config"); } /* Extraneous values */ { std::string data = "<configuration><spam></spam></configuration>"; - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(data); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(data); EXPECT_TRUE(config && "Extraneous values"); } } @@ -57,7 +63,9 @@ TEST(ConfigurationTest, TestBasicOperations) { { std::stringstream stream; simpleConfigStream(stream, "key1", "value1"); - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse single value"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); } @@ -74,7 +82,9 @@ TEST(ConfigurationTest, TestBasicOperations) { { std::stringstream stream; simpleConfigStream(stream, "key1", "value1"); - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse single value"); EXPECT_EQ("value1", config->GetWithDefault("KEY1", "")); } @@ -83,7 +93,9 @@ TEST(ConfigurationTest, TestBasicOperations) { { std::stringstream stream; simpleConfigStream(stream, "key1", "value1"); - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse single value"); optional<std::string> value = config->Get("key1"); EXPECT_TRUE((bool)value); @@ -97,7 +109,9 @@ TEST(ConfigurationTest, TestCompactValues) { std::stringstream stream; stream << "<configuration><property name=\"key1\" " "value=\"value1\"/></configuration>"; - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Compact value parse"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); } @@ -108,7 +122,9 @@ TEST(ConfigurationTest, TestMultipleResources) { { std::stringstream stream; simpleConfigStream(stream, "key1", "value1"); - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); @@ -129,7 +145,9 @@ TEST(ConfigurationTest, TestStringResource) { simpleConfigStream(stream, "key1", "value1"); std::string str = stream.str(); - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse single value"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); } @@ -171,7 +189,9 @@ TEST(ConfigurationTest, TestFinal) { std::stringstream stream; stream << "<configuration><property><name>key1</name><value>value1</" "value><final>false</final></property></configuration>"; - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); @@ -187,7 +207,9 @@ TEST(ConfigurationTest, TestFinal) { std::stringstream stream; stream << "<configuration><property><name>key1</name><value>value1</" "value><final>true</final></property></configuration>"; - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); @@ -203,7 +225,9 @@ TEST(ConfigurationTest, TestFinal) { std::stringstream stream; stream << "<configuration><property name=\"key1\" value=\"value1\" " "final=\"false\"/></configuration>"; - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); @@ -219,7 +243,9 @@ TEST(ConfigurationTest, TestFinal) { std::stringstream stream; stream << "<configuration><property name=\"key1\" value=\"value1\" " "final=\"true\"/></configuration>"; - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); @@ -235,7 +261,9 @@ TEST(ConfigurationTest, TestFinal) { std::stringstream stream; stream << "<configuration><property><name>key1</name><value>value1</" "value><final>spam</final></property></configuration>"; - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); @@ -251,7 +279,9 @@ TEST(ConfigurationTest, TestFinal) { std::stringstream stream; stream << "<configuration><property><name>key1</name><value>value1</" "value><final></final></property></configuration>"; - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); @@ -271,7 +301,9 @@ TEST(ConfigurationTest, TestFileReads) TempFile tempFile; writeSimpleConfig(tempFile.filename, "key1", "value1"); - optional<Configuration> config = ConfigurationLoader().LoadFromFile<Configuration>(tempFile.filename); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.LoadFromFile<Configuration>(tempFile.filename); EXPECT_TRUE(config && "Parse first stream"); EXPECT_EQ("value1", config->GetWithDefault("key1", "")); } @@ -298,7 +330,9 @@ TEST(ConfigurationTest, TestFileReads) { TempDir tempDir; - optional<Configuration> config = ConfigurationLoader().LoadFromFile<Configuration>(tempDir.path); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.LoadFromFile<Configuration>(tempDir.path); EXPECT_FALSE(config && "Add directory as file resource"); } @@ -359,7 +393,9 @@ TEST(ConfigurationTest, TestIntConversions) { { std::stringstream stream; simpleConfigStream(stream, "key1", "1"); - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse single value"); optional<int64_t> value = config->GetInt("key1"); EXPECT_TRUE((bool)value); @@ -398,7 +434,9 @@ TEST(ConfigurationTest, TestDoubleConversions) { { std::stringstream stream; simpleConfigStream(stream, "key1", "1"); - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse single value"); optional<double> value = config->GetDouble("key1"); EXPECT_TRUE((bool)value); @@ -441,7 +479,9 @@ TEST(ConfigurationTest, TestBoolConversions) { { std::stringstream stream; simpleConfigStream(stream, "key1", "true"); - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse single value"); optional<bool> value = config->GetBool("key1"); EXPECT_TRUE((bool)value); @@ -488,7 +528,9 @@ TEST(ConfigurationTest, TestUriConversions) { { std::stringstream stream; simpleConfigStream(stream, "key1", "hdfs:///"); - optional<Configuration> config = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> config = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE(config && "Parse single value"); optional<URI> value = config->GetUri("key1"); EXPECT_TRUE((bool)value); http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.h index 669557f..7947ff5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/configuration_test.h @@ -54,7 +54,9 @@ template <typename... Args> optional<Configuration> simpleConfig(Args... args) { std::stringstream stream; simpleConfigStream(stream, args...); - optional<Configuration> parse = ConfigurationLoader().Load<Configuration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<Configuration> parse = config_loader.Load<Configuration>(stream.str()); EXPECT_TRUE((bool)parse); return parse; http://git-wip-us.apache.org/repos/asf/hadoop/blob/4f6cb5d1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc index 7e9ca66..360f886 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/hdfs_configuration_test.cc @@ -30,7 +30,9 @@ TEST(HdfsConfigurationTest, TestDefaultOptions) { // Completely empty stream { - HdfsConfiguration empty_config = ConfigurationLoader().New<HdfsConfiguration>(); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + HdfsConfiguration empty_config = config_loader.NewConfig<HdfsConfiguration>(); Options options = empty_config.GetOptions(); EXPECT_EQ(Options::kDefaultRpcTimeout, options.rpc_timeout); } @@ -49,8 +51,9 @@ TEST(HdfsConfigurationTest, TestSetOptions) HdfsConfiguration::kIpcClientConnectTimeoutKey, 103, HdfsConfiguration::kHadoopSecurityAuthenticationKey, HdfsConfiguration::kHadoopSecurityAuthentication_kerberos ); - - optional<HdfsConfiguration> config = ConfigurationLoader().Load<HdfsConfiguration>(stream.str()); + ConfigurationLoader config_loader; + config_loader.ClearSearchPath(); + optional<HdfsConfiguration> config = config_loader.Load<HdfsConfiguration>(stream.str()); EXPECT_TRUE(config && "Read stream"); Options options = config->GetOptions(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
