http://git-wip-us.apache.org/repos/asf/hadoop/blob/55b3fdfe/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cc new file mode 100644 index 0000000..133bbc9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cc @@ -0,0 +1,115 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +*/ + +#include "tools_common.h" + +namespace hdfs { + + std::shared_ptr<hdfs::FileSystem> doConnect(hdfs::URI & uri, bool max_timeout) { + hdfs::Options options; + //Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf" + hdfs::ConfigurationLoader loader; + //Loading default config files core-site.xml and hdfs-site.xml from the config path + hdfs::optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>(); + //TODO: HDFS-9539 - after this is resolved, valid config will always be returned. + if(config){ + //Loading options from the config + options = config->GetOptions(); + } + if(max_timeout){ + //TODO: HDFS-9539 - until then we increase the time-out to allow all recursive async calls to finish + options.rpc_timeout = std::numeric_limits<int>::max(); + } + IoService * io_service = IoService::New(); + //Wrapping fs into a shared pointer to guarantee deletion + std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options)); + if (!fs) { + std::cerr << "Could not create FileSystem object. " << std::endl; + exit(EXIT_FAILURE); + } + Status status; + //Check if the user supplied the host + if(!uri.get_host().empty()){ + //If port is supplied we use it, otherwise we use the empty string so that it will be looked up in configs. + std::string port = (uri.get_port()) ? std::to_string(uri.get_port().value()) : ""; + status = fs->Connect(uri.get_host(), port); + if (!status.ok()) { + std::cerr << "Could not connect to " << uri.get_host() << ":" << port << ". " << status.ToString() << std::endl; + exit(EXIT_FAILURE); + } + } else { + status = fs->ConnectToDefaultFs(); + if (!status.ok()) { + if(!options.defaultFS.get_host().empty()){ + std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl; + } else { + std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl; + } + exit(EXIT_FAILURE); + } + } + return fs; + } + + #define BUF_SIZE 1048576 //1 MB + static char input_buffer[BUF_SIZE]; + + void readFile(std::shared_ptr<hdfs::FileSystem> fs, std::string path, off_t offset, std::FILE* dst_file, bool to_delete) { + ssize_t total_bytes_read = 0; + size_t last_bytes_read = 0; + + hdfs::FileHandle *file_raw = nullptr; + hdfs::Status status = fs->Open(path, &file_raw); + if (!status.ok()) { + std::cerr << "Could not open file " << path << ". " << status.ToString() << std::endl; + exit(EXIT_FAILURE); + } + //wrapping file_raw into a unique pointer to guarantee deletion + std::unique_ptr<hdfs::FileHandle> file(file_raw); + + do{ + //Reading file chunks + status = file->PositionRead(input_buffer, sizeof(input_buffer), offset, &last_bytes_read); + if(status.ok()) { + //Writing file chunks to stdout + fwrite(input_buffer, last_bytes_read, 1, dst_file); + total_bytes_read += last_bytes_read; + offset += last_bytes_read; + } else { + if(status.is_invalid_offset()){ + //Reached the end of the file + if(to_delete) { + //Deleting the file (recursive set to false) + hdfs::Status status = fs->Delete(path, false); + if (!status.ok()) { + std::cerr << "Error deleting the source file: " << path + << " " << status.ToString() << std::endl; + exit(EXIT_FAILURE); + } + } + break; + } else { + std::cerr << "Error reading the file: " << status.ToString() << std::endl; + exit(EXIT_FAILURE); + } + } + } while (last_bytes_read > 0); + return; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/55b3fdfe/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cpp ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cpp b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cpp deleted file mode 100644 index af882ce..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.cpp +++ /dev/null @@ -1,70 +0,0 @@ -/* - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. -*/ - -#include "tools_common.h" - -namespace hdfs { - - std::shared_ptr<hdfs::Options> getOptions() { - std::shared_ptr<hdfs::Options> options = std::make_shared<hdfs::Options>(); - //Setting the config path to the default: "$HADOOP_CONF_DIR" or "/etc/hadoop/conf" - hdfs::ConfigurationLoader loader; - //Loading default config files core-site.xml and hdfs-site.xml from the config path - hdfs::optional<HdfsConfiguration> config = loader.LoadDefaultResources<HdfsConfiguration>(); - //TODO: HDFS-9539 - after this is resolved, valid config will always be returned. - if(config){ - //Loading options from the config - *options = config->GetOptions(); - } - return options; - } - - std::shared_ptr<hdfs::FileSystem> doConnect(hdfs::URI & uri, hdfs::Options & options) { - IoService * io_service = IoService::New(); - //Wrapping fs into a shared pointer to guarantee deletion - std::shared_ptr<hdfs::FileSystem> fs(hdfs::FileSystem::New(io_service, "", options)); - if (!fs) { - std::cerr << "Could not create FileSystem object. " << std::endl; - exit(EXIT_FAILURE); - } - Status status; - //Check if the user supplied the host - if(!uri.get_host().empty()){ - //If port is supplied we use it, otherwise we use the empty string so that it will be looked up in configs. - std::string port = (uri.get_port()) ? std::to_string(uri.get_port().value()) : ""; - status = fs->Connect(uri.get_host(), port); - if (!status.ok()) { - std::cerr << "Could not connect to " << uri.get_host() << ":" << port << ". " << status.ToString() << std::endl; - exit(EXIT_FAILURE); - } - } else { - status = fs->ConnectToDefaultFs(); - if (!status.ok()) { - if(!options.defaultFS.get_host().empty()){ - std::cerr << "Error connecting to " << options.defaultFS << ". " << status.ToString() << std::endl; - } else { - std::cerr << "Error connecting to the cluster: defaultFS is empty. " << status.ToString() << std::endl; - } - exit(EXIT_FAILURE); - } - } - return fs; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/55b3fdfe/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.h index 858fc4b..f7c5e09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tools/tools_common.h @@ -23,17 +23,15 @@ #include "hdfspp/hdfspp.h" #include "common/hdfs_configuration.h" #include "common/configuration_loader.h" - #include <mutex> namespace hdfs { - //Pull configurations and get the Options object - std::shared_ptr<hdfs::Options> getOptions(); - //Build all necessary objects and perform the connection - std::shared_ptr<hdfs::FileSystem> doConnect(hdfs::URI & uri, hdfs::Options & options); + std::shared_ptr<hdfs::FileSystem> doConnect(hdfs::URI & uri, bool max_timeout); + //Open HDFS file at offset, read it to destination file, optionally delete source file + void readFile(std::shared_ptr<hdfs::FileSystem> fs, std::string path, off_t offset, std::FILE* dst_file, bool to_delete); } #endif --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org