HDFS-9144. Refactoring libhdfs++ into stateful/ephemeral objects. Contributed by Bob Hansen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a06bc8e1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a06bc8e1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a06bc8e1 Branch: refs/heads/HDFS-8707 Commit: a06bc8e1232622aab0b7e4f26b547125de9ab300 Parents: d6d056d Author: James <[email protected]> Authored: Thu Dec 3 07:30:22 2015 -0500 Committer: James <[email protected]> Committed: Thu Dec 3 07:30:22 2015 -0500 ---------------------------------------------------------------------- .../native/libhdfspp/include/libhdfspp/hdfs.h | 38 +- .../libhdfspp/include/libhdfspp/options.h | 11 + .../main/native/libhdfspp/lib/CMakeLists.txt | 1 + .../libhdfspp/lib/bindings/c/CMakeLists.txt | 2 +- .../native/libhdfspp/lib/bindings/c/hdfs.cc | 37 +- .../native/libhdfspp/lib/bindings/c/hdfs_cpp.cc | 216 --------- .../native/libhdfspp/lib/bindings/c/hdfs_cpp.h | 105 ----- .../native/libhdfspp/lib/common/CMakeLists.txt | 19 +- .../native/libhdfspp/lib/common/async_stream.h | 49 +++ .../libhdfspp/lib/common/continuation/asio.h | 4 +- .../lib/common/continuation/protobuf.h | 21 +- .../libhdfspp/lib/common/hdfs_public_api.cc | 16 - .../main/native/libhdfspp/lib/common/options.cc | 3 +- .../main/native/libhdfspp/lib/common/util.cc | 35 ++ .../src/main/native/libhdfspp/lib/common/util.h | 7 + .../libhdfspp/lib/connection/CMakeLists.txt | 2 + .../lib/connection/datanodeconnection.cc | 57 +++ .../lib/connection/datanodeconnection.h | 66 +++ .../main/native/libhdfspp/lib/fs/CMakeLists.txt | 2 +- .../main/native/libhdfspp/lib/fs/filehandle.cc | 240 ++++++++++ .../main/native/libhdfspp/lib/fs/filehandle.h | 115 +++++ .../main/native/libhdfspp/lib/fs/filesystem.cc | 212 +++++++-- .../main/native/libhdfspp/lib/fs/filesystem.h | 137 +++--- .../main/native/libhdfspp/lib/fs/inputstream.cc | 48 -- .../native/libhdfspp/lib/fs/inputstream_impl.h | 207 --------- .../native/libhdfspp/lib/reader/CMakeLists.txt | 2 +- .../native/libhdfspp/lib/reader/block_reader.cc | 433 +++++++++++++++++++ .../native/libhdfspp/lib/reader/block_reader.h | 83 +++- .../native/libhdfspp/lib/reader/datatransfer.h | 28 +- .../libhdfspp/lib/reader/datatransfer_impl.h | 23 +- .../main/native/libhdfspp/lib/reader/fileinfo.h | 36 ++ .../libhdfspp/lib/reader/remote_block_reader.cc | 46 -- .../lib/reader/remote_block_reader_impl.h | 342 --------------- .../main/native/libhdfspp/lib/rpc/rpc_engine.cc | 14 - .../main/native/libhdfspp/tests/CMakeLists.txt | 12 +- .../native/libhdfspp/tests/bad_datanode_test.cc | 208 ++++++--- .../native/libhdfspp/tests/inputstream_test.cc | 232 ---------- .../native/libhdfspp/tests/mock_connection.h | 15 +- .../libhdfspp/tests/remote_block_reader_test.cc | 202 +++++++-- 39 files changed, 1802 insertions(+), 1524 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h index 06619fd..dfff20b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/hdfs.h @@ -24,6 +24,7 @@ #include <functional> #include <memory> #include <set> +#include <iostream> namespace hdfs { @@ -68,10 +69,10 @@ class NodeExclusionRule { }; /** - * Applications opens an InputStream to read files in HDFS. + * Applications opens a FileHandle to read files in HDFS. **/ -class InputStream { - public: +class FileHandle { +public: /** * Read data from a specific position. The current implementation * stops at the block boundary. @@ -83,10 +84,14 @@ class InputStream { * The handler returns the datanode that serves the block and the number of * bytes has read. **/ - virtual void PositionRead( - void *buf, size_t nbyte, uint64_t offset, - const std::function<void(const Status &, const std::string &, size_t)> & - handler) = 0; + virtual void + PositionRead(void *buf, size_t nbyte, uint64_t offset, + const std::function<void(const Status &, size_t)> &handler) = 0; + + virtual Status PositionRead(void *buf, size_t *nbyte, off_t offset) = 0; + virtual Status Read(void *buf, size_t *nbyte) = 0; + virtual Status Seek(off_t *offset, std::ios_base::seekdir whence) = 0; + /** * Determine if a datanode should be excluded from future operations * based on the return Status. @@ -97,7 +102,7 @@ class InputStream { **/ static bool ShouldExclude(const Status &status); - virtual ~InputStream(); + virtual ~FileHandle(); }; /** @@ -114,15 +119,24 @@ class FileSystem { IoService *io_service, const Options &options, const std::string &server, const std::string &service, const std::function<void(const Status &, FileSystem *)> &handler); + + /* Synchronous call of New*/ + static FileSystem * + New(IoService *io_service, const Options &options, const std::string &server, + const std::string &service); + /** * Open a file on HDFS. The call issues an RPC to the NameNode to * gather the locations of all blocks in the file and to return a * new instance of the @ref InputStream object. **/ - virtual void Open( - const std::string &path, - const std::function<void(const Status &, InputStream *)> &handler) = 0; - virtual ~FileSystem(); + virtual void + Open(const std::string &path, + const std::function<void(const Status &, FileHandle *)> &handler) = 0; + virtual Status Open(const std::string &path, FileHandle **handle) = 0; + + virtual ~FileSystem() {}; + }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h index 0abdfa0..79b9c54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/libhdfspp/options.h @@ -31,6 +31,17 @@ struct Options { int rpc_timeout; /** + * Maximum number of retries for RPC operations + **/ + const static int NO_RPC_RETRY = -1; + int max_rpc_retries; + + /** + * Number of ms to wait between retry of RPC operations + **/ + int rpc_retry_delay_ms; + + /** * Exclusion time for failed datanodes in milliseconds. * Default: 60000 **/ http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt index 434dc4e..c851597 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/CMakeLists.txt @@ -21,4 +21,5 @@ add_subdirectory(fs) add_subdirectory(reader) add_subdirectory(rpc) add_subdirectory(proto) +add_subdirectory(connection) add_subdirectory(bindings) http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt index e170370..664518a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/CMakeLists.txt @@ -16,5 +16,5 @@ # under the License. -add_library(bindings_c hdfs.cc hdfs_cpp.cc) +add_library(bindings_c hdfs.cc) add_dependencies(bindings_c fs rpc reader proto common fs rpc reader proto common) http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index 853e9d3..802b3ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -16,8 +16,10 @@ * limitations under the License. */ -#include "hdfs_cpp.h" +#include "fs/filesystem.h" +#include <hdfs/hdfs.h> +#include <string> #include <cstring> #include <iostream> @@ -25,15 +27,15 @@ using namespace hdfs; /* Seperate the handles used by the C api from the C++ API*/ struct hdfs_internal { - hdfs_internal(HadoopFileSystem *p) : filesystem_(p) {} - hdfs_internal(std::unique_ptr<HadoopFileSystem> p) + hdfs_internal(FileSystem *p) : filesystem_(p) {} + hdfs_internal(std::unique_ptr<FileSystem> p) : filesystem_(std::move(p)) {} virtual ~hdfs_internal(){}; - HadoopFileSystem *get_impl() { return filesystem_.get(); } - const HadoopFileSystem *get_impl() const { return filesystem_.get(); } + FileSystem *get_impl() { return filesystem_.get(); } + const FileSystem *get_impl() const { return filesystem_.get(); } private: - std::unique_ptr<HadoopFileSystem> filesystem_; + std::unique_ptr<FileSystem> filesystem_; }; struct hdfsFile_internal { @@ -102,17 +104,23 @@ bool CheckSystemAndHandle(hdfsFS fs, hdfsFile file) { int hdfsFileIsOpenForRead(hdfsFile file) { /* files can only be open for reads at the moment, do a quick check */ if (file) { - return file->get_impl()->IsOpenForRead(); + return true; // Update implementation when we get file writing } return false; } hdfsFS hdfsConnect(const char *nn, tPort port) { - HadoopFileSystem *fs = new HadoopFileSystem(); - Status stat = fs->Connect(nn, port); - if (!stat.ok()) { + std::string port_as_string = std::to_string(port); + IoService * io_service = IoService::New(); + FileSystem *fs = FileSystem::New(io_service, Options(), nn, port_as_string); + if (!fs) { ReportError(ENODEV, "Unable to connect to NameNode."); - delete fs; + + // FileSystem's ctor might take ownership of the io_service; if it does, + // it will null out the pointer + if (io_service) + delete io_service; + return nullptr; } return new hdfs_internal(fs); @@ -139,7 +147,7 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char *path, int flags, int bufferSize, return nullptr; } FileHandle *f = nullptr; - Status stat = fs->get_impl()->OpenFileForRead(path, &f); + Status stat = fs->get_impl()->Open(path, &f); if (!stat.ok()) { return nullptr; } @@ -150,7 +158,6 @@ int hdfsCloseFile(hdfsFS fs, hdfsFile file) { if (!CheckSystemAndHandle(fs, file)) { return -1; } - delete file; return 0; } @@ -162,8 +169,8 @@ tSize hdfsPread(hdfsFS fs, hdfsFile file, tOffset position, void *buffer, } size_t len = length; - Status stat = file->get_impl()->Pread(buffer, &len, position); - if (!stat.ok()) { + Status stat = file->get_impl()->PositionRead(buffer, &len, position); + if(!stat.ok()) { return Error(stat); } return (tSize)len; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc deleted file mode 100644 index 8872b1d..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.cc +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "hdfs_cpp.h" - -#include <cstdint> -#include <cerrno> -#include <string> -#include <future> -#include <memory> -#include <thread> -#include <vector> -#include <set> -#include <tuple> - -#include <hdfs/hdfs.h> -#include "libhdfspp/hdfs.h" -#include "libhdfspp/status.h" -#include "fs/filesystem.h" -#include "common/hdfs_public_api.h" - -namespace hdfs { - -FileHandle::FileHandle(InputStream *is) : input_stream_(is), offset_(0){} - -Status FileHandle::Pread(void *buf, size_t *nbyte, off_t offset) { - auto callstate = std::make_shared<std::promise<std::tuple<Status, std::string, size_t>>>(); - std::future<std::tuple<Status, std::string, size_t>> future(callstate->get_future()); - - /* wrap async call with promise/future to make it blocking */ - auto callback = [callstate]( - const Status &s, const std::string &dn, size_t bytes) { - callstate->set_value(std::make_tuple(s, dn, bytes)); - }; - - input_stream_->PositionRead(buf, *nbyte, offset, callback); - - /* wait for async to finish */ - auto returnstate = future.get(); - auto stat = std::get<0>(returnstate); - - if (!stat.ok()) { - /* determine if DN gets marked bad */ - if (InputStream::ShouldExclude(stat)) { - InputStreamImpl *impl = - static_cast<InputStreamImpl *>(input_stream_.get()); - impl->bad_node_tracker_->AddBadNode(std::get<1>(returnstate)); - } - - return stat; - } - *nbyte = std::get<2>(returnstate); - return Status::OK(); -} - -Status FileHandle::Read(void *buf, size_t *nbyte) { - Status stat = Pread(buf, nbyte, offset_); - if (!stat.ok()) { - return stat; - } - - offset_ += *nbyte; - return Status::OK(); -} - -Status FileHandle::Seek(off_t *offset, std::ios_base::seekdir whence) { - off_t new_offset = -1; - - switch (whence) { - case std::ios_base::beg: - new_offset = *offset; - break; - case std::ios_base::cur: - new_offset = offset_ + *offset; - break; - case std::ios_base::end: - new_offset = static_cast<InputStreamImpl *>(input_stream_.get()) - ->get_file_length() + - *offset; - break; - default: - /* unsupported */ - return Status::InvalidArgument("Invalid Seek whence argument"); - } - - if (!CheckSeekBounds(new_offset)) { - return Status::InvalidArgument("Seek offset out of bounds"); - } - offset_ = new_offset; - - *offset = offset_; - return Status::OK(); -} - -/* return false if seek will be out of bounds */ -bool FileHandle::CheckSeekBounds(ssize_t desired_position) { - ssize_t file_length = - static_cast<InputStreamImpl *>(input_stream_.get())->get_file_length(); - - if (desired_position < 0 || desired_position >= file_length) { - return false; - } - - return true; -} - -bool FileHandle::IsOpenForRead() { - /* for now just check if InputStream exists */ - if (!input_stream_) { - return false; - } - return true; -} - -HadoopFileSystem::~HadoopFileSystem() { - /** - * Note: IoService must be stopped before getting rid of worker threads. - * Once worker threads are joined and deleted the service can be deleted. - **/ - - file_system_.reset(nullptr); - service_->Stop(); - worker_threads_.clear(); - service_.reset(nullptr); -} - -Status HadoopFileSystem::Connect(const char *nn, tPort port, - unsigned int threads) { - /* IoService::New can return nullptr */ - if (!service_) { - return Status::Error("Null IoService"); - } - /* spawn background threads for asio delegation */ - for (unsigned int i = 0; i < threads; i++) { - AddWorkerThread(); - } - /* synchronized */ - auto callstate = std::make_shared<std::promise<std::tuple<Status, FileSystem*>>>(); - std::future<std::tuple<Status, FileSystem*>> future(callstate->get_future()); - - auto callback = [callstate](const Status &s, FileSystem *f) { - callstate->set_value(std::make_tuple(s,f)); - }; - - /* dummy options object until this is hooked up to HDFS-9117 */ - Options options_object; - FileSystem::New(service_.get(), options_object, nn, std::to_string(port), - callback); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = std::get<0>(returnstate); - FileSystem *fs = std::get<1>(returnstate); - - /* check and see if it worked */ - if (!stat.ok() || !fs) { - service_->Stop(); - worker_threads_.clear(); - return stat; - } - - file_system_ = std::unique_ptr<FileSystem>(fs); - return stat; -} - -int HadoopFileSystem::AddWorkerThread() { - auto service_task = [](IoService *service) { service->Run(); }; - worker_threads_.push_back( - WorkerPtr(new std::thread(service_task, service_.get()))); - return worker_threads_.size(); -} - -Status HadoopFileSystem::OpenFileForRead(const std::string &path, - FileHandle **handle) { - auto callstate = std::make_shared<std::promise<std::tuple<Status, InputStream*>>>(); - std::future<std::tuple<Status, InputStream*>> future(callstate->get_future()); - - /* wrap async FileSystem::Open with promise to make it a blocking call */ - auto h = [callstate](const Status &s, InputStream *is) { - callstate->set_value(std::make_tuple(s, is)); - }; - - file_system_->Open(path, h); - - /* block until promise is set */ - auto returnstate = future.get(); - Status stat = std::get<0>(returnstate); - InputStream *input_stream = std::get<1>(returnstate); - - if (!stat.ok()) { - delete input_stream; - return stat; - } - if (!input_stream) { - return stat; - } - - *handle = new FileHandle(input_stream); - return stat; -} -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h deleted file mode 100644 index b36ee8f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs_cpp.h +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#ifndef LIBHDFSPP_BINDINGS_HDFSCPP_H -#define LIBHDFSPP_BINDINGS_HDFSCPP_H - -#include <cstdint> -#include <thread> -#include <vector> -#include <mutex> -#include <chrono> -#include <iostream> - -#include "libhdfspp/hdfs.h" -#include "fs/bad_datanode_tracker.h" -#include <hdfs/hdfs.h> - -namespace hdfs { - -/** - * Implement a very simple 'it just works' interface in C++ - * that provides posix-like file operations + extra stuff for hadoop. - * Then provide very thin C wrappers over each method. - */ - -class HadoopFileSystem; - -class FileHandle { - public: - virtual ~FileHandle(){}; - /** - * Note: The nbyte argument for Read and Pread as well as the - * offset argument for Seek are in/out parameters. - * - * For Read and Pread the value referenced by nbyte should - * be set to the number of bytes to read. Before returning - * the value referenced will be set by the callee to the number - * of bytes that was successfully read. - * - * For Seek the value referenced by offset should be the number - * of bytes to shift from the specified whence position. The - * referenced value will be set to the new offset before returning. - **/ - Status Pread(void *buf, size_t *nbyte, off_t offset); - Status Read(void *buf, size_t *nbyte); - Status Seek(off_t *offset, std::ios_base::seekdir whence); - bool IsOpenForRead(); - - private: - /* handle should only be created by fs */ - friend class HadoopFileSystem; - FileHandle(InputStream *is); - bool CheckSeekBounds(ssize_t desired_position); - std::unique_ptr<InputStream> input_stream_; - off_t offset_; -}; - -class HadoopFileSystem { - public: - HadoopFileSystem() : service_(IoService::New()) {} - virtual ~HadoopFileSystem(); - - /* attempt to connect to namenode, return false on failure */ - Status Connect(const char *nn, tPort port, unsigned int threads = 1); - - /* how many worker threads are servicing asio requests */ - int WorkerThreadCount() { return worker_threads_.size(); } - - /* add a new thread to handle asio requests, return number of threads in pool - */ - int AddWorkerThread(); - - Status OpenFileForRead(const std::string &path, FileHandle **handle); - - private: - std::unique_ptr<IoService> service_; - /* std::thread needs to join before deletion */ - struct WorkerDeleter { - void operator()(std::thread *t) { - t->join(); - delete t; - } - }; - typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr; - std::vector<WorkerPtr> worker_threads_; - std::unique_ptr<FileSystem> file_system_; -}; -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt index 8dbcd03..0d3752a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt @@ -1 +1,18 @@ -add_library(common base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc) +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +add_library(common base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc util.cc) http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h new file mode 100644 index 0000000..575904c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/async_stream.h @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIB_COMMON_ASYNC_STREAM_H_ +#define LIB_COMMON_ASYNC_STREAM_H_ + +#include <asio.hpp> + +namespace hdfs { + +typedef asio::mutable_buffers_1 MutableBuffers; +typedef asio::const_buffers_1 ConstBuffers; + +/* + * asio-compatible stream implementation. + * + * Lifecycle: should be managed using std::shared_ptr so the object can be + * handed from consumer to consumer + * Threading model: async_read_some and async_write_some are not thread-safe. + */ +class AsyncStream { +public: + virtual void async_read_some(const MutableBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) = 0; + + virtual void async_write_some(const ConstBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) = 0; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h index 5630934..a5a0446 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/asio.h @@ -29,7 +29,9 @@ #include <asio/ip/tcp.hpp> namespace hdfs { -namespace continuation { +namespace asio_continuation { + +using namespace continuation; template <class Stream, class MutableBufferSequence> class ReadContinuation : public Continuation { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h index 08caf0d..54caeed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/protobuf.h @@ -33,7 +33,7 @@ namespace continuation { template <class Stream, size_t MaxMessageSize = 512> struct ReadDelimitedPBMessageContinuation : public Continuation { - ReadDelimitedPBMessageContinuation(Stream *stream, + ReadDelimitedPBMessageContinuation(std::shared_ptr<Stream> stream, ::google::protobuf::MessageLite *msg) : stream_(stream), msg_(msg) {} @@ -56,8 +56,8 @@ struct ReadDelimitedPBMessageContinuation : public Continuation { } next(status); }; - asio::async_read( - *stream_, asio::buffer(buf_), + asio::async_read(*stream_, + asio::buffer(buf_), std::bind(&ReadDelimitedPBMessageContinuation::CompletionHandler, this, std::placeholders::_1, std::placeholders::_2), handler); @@ -82,14 +82,14 @@ private: return offset ? len + offset - transferred : 1; } - Stream *stream_; + std::shared_ptr<Stream> stream_; ::google::protobuf::MessageLite *msg_; std::array<char, MaxMessageSize> buf_; }; template <class Stream> struct WriteDelimitedPBMessageContinuation : Continuation { - WriteDelimitedPBMessageContinuation(Stream *stream, + WriteDelimitedPBMessageContinuation(std::shared_ptr<Stream> stream, const google::protobuf::MessageLite *msg) : stream_(stream), msg_(msg) {} @@ -101,28 +101,25 @@ struct WriteDelimitedPBMessageContinuation : Continuation { pbio::CodedOutputStream os(&ss); os.WriteVarint32(size); msg_->SerializeToCodedStream(&os); - write_coroutine_ = - std::shared_ptr<Continuation>(Write(stream_, asio::buffer(buf_))); - write_coroutine_->Run([next](const Status &stat) { next(stat); }); + asio::async_write(*stream_, asio::buffer(buf_), [next](const asio::error_code &ec, size_t) { next(ToStatus(ec)); } ); } private: - Stream *stream_; + std::shared_ptr<Stream> stream_; const google::protobuf::MessageLite *msg_; std::string buf_; - std::shared_ptr<Continuation> write_coroutine_; }; template <class Stream, size_t MaxMessageSize = 512> static inline Continuation * -ReadDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) { +ReadDelimitedPBMessage(std::shared_ptr<Stream> stream, ::google::protobuf::MessageLite *msg) { return new ReadDelimitedPBMessageContinuation<Stream, MaxMessageSize>(stream, msg); } template <class Stream> static inline Continuation * -WriteDelimitedPBMessage(Stream *stream, ::google::protobuf::MessageLite *msg) { +WriteDelimitedPBMessage(std::shared_ptr<Stream> stream, ::google::protobuf::MessageLite *msg) { return new WriteDelimitedPBMessageContinuation<Stream>(stream, msg); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc index 37408de..0251ce5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/hdfs_public_api.cc @@ -24,20 +24,4 @@ IoService::~IoService() {} IoService *IoService::New() { return new IoServiceImpl(); } -bool InputStream::ShouldExclude(const Status &s) { - if (s.ok()) { - return false; - } - - switch (s.code()) { - /* client side resource exhaustion */ - case Status::kResourceUnavailable: - return false; - case Status::kInvalidArgument: - case Status::kUnimplemented: - case Status::kException: - default: - return true; - } -} } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc index e71b6b3..9e1acbd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/options.cc @@ -20,5 +20,6 @@ namespace hdfs { -Options::Options() : rpc_timeout(30000), host_exclusion_duration(600000) {} +Options::Options() : rpc_timeout(30000), max_rpc_retries(0), + rpc_retry_delay_ms(10000), host_exclusion_duration(600000) {} } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc new file mode 100644 index 0000000..eaef2d0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.cc @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "common/util.h" + +namespace hdfs { + +std::string GetRandomClientName() { + unsigned char buf[6] = { + 0, + }; + RAND_pseudo_bytes(buf, sizeof(buf)); + + std::stringstream ss; + ss << "libhdfs++_" + << Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf))); + return ss.str(); +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h index ff9f36c..a6acc4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/util.h @@ -20,7 +20,10 @@ #include "libhdfspp/status.h" +#include <sstream> + #include <asio/error_code.hpp> +#include <openssl/rand.h> #include <google/protobuf/message_lite.h> #include <google/protobuf/io/coded_stream.h> @@ -53,6 +56,10 @@ static inline void ReadDelimitedPBMessage( std::string Base64Encode(const std::string &src); +/* + * Returns a new high-entropy client name + */ +std::string GetRandomClientName(); } #endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt new file mode 100644 index 0000000..54ba96c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/CMakeLists.txt @@ -0,0 +1,2 @@ +add_library(connection datanodeconnection.cc) +add_dependencies(connection proto) http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc new file mode 100644 index 0000000..b053e7f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "datanodeconnection.h" +#include "common/util.h" + +namespace hdfs { + +DataNodeConnection::~DataNodeConnection(){} +DataNodeConnectionImpl::~DataNodeConnectionImpl(){} + +DataNodeConnectionImpl::DataNodeConnectionImpl(asio::io_service * io_service, + const ::hadoop::hdfs::DatanodeInfoProto &dn_proto, + const hadoop::common::TokenProto *token) +{ + using namespace ::asio::ip; + + conn_.reset(new tcp::socket(*io_service)); + auto datanode_addr = dn_proto.id(); + endpoints_[0] = tcp::endpoint(address::from_string(datanode_addr.ipaddr()), + datanode_addr.xferport()); + uuid_ = dn_proto.id().datanodeuuid(); + + if (token) { + token_.reset(new hadoop::common::TokenProto()); + token_->CheckTypeAndMergeFrom(*token); + } +} + + +void DataNodeConnectionImpl::Connect( + std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) { + // Keep the DN from being freed until we're done + auto shared_this = shared_from_this(); + asio::async_connect(*conn_, endpoints_.begin(), endpoints_.end(), + [shared_this, handler](const asio::error_code &ec, std::array<asio::ip::tcp::endpoint, 1>::iterator it) { + (void)it; + handler(ToStatus(ec), shared_this); }); +} + + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h new file mode 100644 index 0000000..d5bbb92 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_ +#define LIBHDFSPP_LIB_CONNECTION_DATANODECONNECTION_H_ + +#include "common/hdfs_public_api.h" +#include "common/async_stream.h" +#include "ClientNamenodeProtocol.pb.h" + +#include "asio.hpp" + +namespace hdfs { + +class DataNodeConnection : public AsyncStream { +public: + std::string uuid_; + std::unique_ptr<hadoop::common::TokenProto> token_; + + virtual ~DataNodeConnection(); + virtual void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) = 0; +}; + + +class DataNodeConnectionImpl : public DataNodeConnection, public std::enable_shared_from_this<DataNodeConnectionImpl>{ +public: + std::unique_ptr<asio::ip::tcp::socket> conn_; + std::array<asio::ip::tcp::endpoint, 1> endpoints_; + std::string uuid_; + + virtual ~DataNodeConnectionImpl(); + DataNodeConnectionImpl(asio::io_service * io_service, const ::hadoop::hdfs::DatanodeInfoProto &dn_proto, + const hadoop::common::TokenProto *token); + + void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override; + + void async_read_some(const MutableBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { + conn_->async_read_some(buf, handler); + }; + + void async_write_some(const ConstBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { + conn_->async_write_some(buf, handler); + } +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt index 0870fb3..ae56b3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/CMakeLists.txt @@ -1,2 +1,2 @@ -add_library(fs filesystem.cc inputstream.cc bad_datanode_tracker.cc) +add_library(fs filesystem.cc filehandle.cc bad_datanode_tracker.cc) add_dependencies(fs proto) http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc new file mode 100644 index 0000000..8cf41ce --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "filehandle.h" +#include "common/continuation/continuation.h" +#include "connection/datanodeconnection.h" +#include "reader/block_reader.h" + +#include <future> +#include <tuple> + +namespace hdfs { + +using ::hadoop::hdfs::LocatedBlocksProto; + +FileHandle::~FileHandle() {} + +FileHandleImpl::FileHandleImpl(::asio::io_service *io_service, const std::string &client_name, + const std::shared_ptr<const struct FileInfo> file_info, + std::shared_ptr<BadDataNodeTracker> bad_data_nodes) + : io_service_(io_service), client_name_(client_name), file_info_(file_info), + bad_node_tracker_(bad_data_nodes), offset_(0) { +} + +void FileHandleImpl::PositionRead( + void *buf, size_t nbyte, uint64_t offset, + const std::function<void(const Status &, size_t)> + &handler) { + + auto callback = [this, handler](const Status &status, + const std::string &contacted_datanode, + size_t bytes_read) { + /* determine if DN gets marked bad */ + if (ShouldExclude(status)) { + bad_node_tracker_->AddBadNode(contacted_datanode); + } + + handler(status, bytes_read); + }; + + AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, callback); +} + +Status FileHandleImpl::PositionRead(void *buf, size_t *nbyte, off_t offset) { + auto callstate = std::make_shared<std::promise<std::tuple<Status, size_t>>>(); + std::future<std::tuple<Status, size_t>> future(callstate->get_future()); + + /* wrap async call with promise/future to make it blocking */ + auto callback = [callstate](const Status &s, size_t bytes) { + callstate->set_value(std::make_tuple(s,bytes)); + }; + + PositionRead(buf, *nbyte, offset, callback); + + /* wait for async to finish */ + auto returnstate = future.get(); + auto stat = std::get<0>(returnstate); + + if (!stat.ok()) { + return stat; + } + + *nbyte = std::get<1>(returnstate); + return stat; +} + +Status FileHandleImpl::Read(void *buf, size_t *nbyte) { + Status stat = PositionRead(buf, nbyte, offset_); + if(!stat.ok()) { + return stat; + } + + offset_ += *nbyte; + return Status::OK(); +} + +Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) { + off_t new_offset = -1; + + switch (whence) { + case std::ios_base::beg: + new_offset = *offset; + break; + case std::ios_base::cur: + new_offset = offset_ + *offset; + break; + case std::ios_base::end: + new_offset = file_info_->file_length_ + *offset; + break; + default: + /* unsupported */ + return Status::InvalidArgument("Invalid Seek whence argument"); + } + + if(!CheckSeekBounds(new_offset)) { + return Status::InvalidArgument("Seek offset out of bounds"); + } + offset_ = new_offset; + + *offset = offset_; + return Status::OK(); +} + +/* return false if seek will be out of bounds */ +bool FileHandleImpl::CheckSeekBounds(ssize_t desired_position) { + ssize_t file_length = file_info_->file_length_; + + if (desired_position < 0 || desired_position >= file_length) { + return false; + } + + return true; +} + +/* + * Note that this method must be thread-safe w.r.t. the unsafe operations occurring + * on the FileHandle + */ +void FileHandleImpl::AsyncPreadSome( + size_t offset, const MutableBuffers &buffers, + std::shared_ptr<NodeExclusionRule> excluded_nodes, + const std::function<void(const Status &, const std::string &, size_t)> handler) { + using ::hadoop::hdfs::DatanodeInfoProto; + using ::hadoop::hdfs::LocatedBlockProto; + + /** + * Note: block and chosen_dn will end up pointing to things inside + * the blocks_ vector. They shouldn't be directly deleted. + **/ + auto block = std::find_if( + file_info_->blocks_.begin(), file_info_->blocks_.end(), [offset](const LocatedBlockProto &p) { + return p.offset() <= offset && offset < p.offset() + p.b().numbytes(); + }); + + if (block == file_info_->blocks_.end()) { + handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0); + return; + } + + /** + * If user supplies a rule use it, otherwise use the tracker. + * User is responsible for making sure one of them isn't null. + **/ + std::shared_ptr<NodeExclusionRule> rule = + excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_; + + auto datanodes = block->locs(); + auto it = std::find_if(datanodes.begin(), datanodes.end(), + [rule](const DatanodeInfoProto &dn) { + return !rule->IsBadNode(dn.id().datanodeuuid()); + }); + + if (it == datanodes.end()) { + handler(Status::ResourceUnavailable("No datanodes available"), "", 0); + return; + } + + DatanodeInfoProto &chosen_dn = *it; + + uint64_t offset_within_block = offset - block->offset(); + uint64_t size_within_block = std::min<uint64_t>( + block->b().numbytes() - offset_within_block, asio::buffer_size(buffers)); + + // This is where we will put the logic for re-using a DN connection; we can + // steal the FileHandle's dn and put it back when we're done + std::shared_ptr<DataNodeConnection> dn = CreateDataNodeConnection(io_service_, chosen_dn, nullptr /*token*/); + std::string dn_id = dn->uuid_; + std::string client_name = client_name_; + + // Wrap the DN in a block reader to handle the state and logic of the + // block request protocol + std::shared_ptr<BlockReader> reader; + reader = CreateBlockReader(BlockReaderOptions(), dn); + + + auto read_handler = [reader, dn_id, handler](const Status & status, size_t transferred) { + handler(status, dn_id, transferred); + }; + + dn->Connect([handler,read_handler,block,offset_within_block,size_within_block, buffers, reader, dn_id, client_name] + (Status status, std::shared_ptr<DataNodeConnection> dn) { + (void)dn; + if (status.ok()) { + reader->AsyncReadBlock( + client_name, *block, offset_within_block, + asio::buffer(buffers, size_within_block), read_handler); + } else { + handler(status, dn_id, 0); + } + }); + + return; +} + +std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options, + std::shared_ptr<DataNodeConnection> dn) +{ + return std::make_shared<BlockReaderImpl>(options, dn); +} + +std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection( + ::asio::io_service * io_service, + const ::hadoop::hdfs::DatanodeInfoProto & dn, + const hadoop::common::TokenProto * token) { + return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token); +} + +bool FileHandle::ShouldExclude(const Status &s) { + if (s.ok()) { + return false; + } + + switch (s.code()) { + /* client side resource exhaustion */ + case Status::kResourceUnavailable: + return false; + case Status::kInvalidArgument: + case Status::kUnimplemented: + case Status::kException: + default: + return true; + } +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h new file mode 100644 index 0000000..95b5869 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_LIB_FS_FILEHANDLE_H_ +#define LIBHDFSPP_LIB_FS_FILEHANDLE_H_ + +#include "common/hdfs_public_api.h" +#include "common/async_stream.h" +#include "reader/fileinfo.h" + +#include "asio.hpp" +#include "bad_datanode_tracker.h" +#include "ClientNamenodeProtocol.pb.h" + +#include <mutex> +#include <iostream> + +namespace hdfs { + +class BlockReader; +class BlockReaderOptions; +class DataNodeConnection; + +/* + * FileHandle: coordinates operations on a particular file in HDFS + * + * Threading model: not thread-safe; consumers and io_service should not call + * concurrently. PositionRead is the exceptions; they can be + * called concurrently and repeatedly. + * Lifetime: pointer returned to consumer by FileSystem::Open. Consumer is + * resonsible for freeing the object. + */ +class FileHandleImpl : public FileHandle { +public: + FileHandleImpl(::asio::io_service *io_service, const std::string &client_name, + const std::shared_ptr<const struct FileInfo> file_info, + std::shared_ptr<BadDataNodeTracker> bad_data_nodes); + + /* + * [Some day reliably] Reads a particular offset into the data file. + * On error, bytes_read returns the number of bytes successfully read; on + * success, bytes_read will equal nbyte + */ + void PositionRead( + void *buf, + size_t nbyte, + uint64_t offset, + const std::function<void(const Status &status, size_t bytes_read)> &handler + ) override; + + /** + * Note: The nbyte argument for Read and Pread as well as the + * offset argument for Seek are in/out parameters. + * + * For Read and Pread the value referenced by nbyte should + * be set to the number of bytes to read. Before returning + * the value referenced will be set by the callee to the number + * of bytes that was successfully read. + * + * For Seek the value referenced by offset should be the number + * of bytes to shift from the specified whence position. The + * referenced value will be set to the new offset before returning. + **/ + Status PositionRead(void *buf, size_t *bytes_read, off_t offset) override; + Status Read(void *buf, size_t *nbyte) override; + Status Seek(off_t *offset, std::ios_base::seekdir whence) override; + + + /* + * Reads some amount of data into the buffer. Will attempt to find the best + * datanode and read data from it. + * + * If an error occurs during connection or transfer, the callback will be + * called with bytes_read equal to the number of bytes successfully transferred. + * If no data nodes can be found, status will be Status::ResourceUnavailable. + * + */ + void AsyncPreadSome(size_t offset, const MutableBuffers &buffers, + std::shared_ptr<NodeExclusionRule> excluded_nodes, + const std::function<void(const Status &status, + const std::string &dn_id, size_t bytes_read)> handler); + +protected: + virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options, + std::shared_ptr<DataNodeConnection> dn); + virtual std::shared_ptr<DataNodeConnection> CreateDataNodeConnection( + ::asio::io_service *io_service, + const ::hadoop::hdfs::DatanodeInfoProto & dn, + const hadoop::common::TokenProto * token); +private: + ::asio::io_service * const io_service_; + const std::string client_name_; + const std::shared_ptr<const struct FileInfo> file_info_; + std::shared_ptr<BadDataNodeTracker> bad_node_tracker_; + bool CheckSeekBounds(ssize_t desired_position); + off_t offset_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc index e0c27ac..7404606 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.cc @@ -22,7 +22,10 @@ #include <asio/ip/tcp.hpp> +#include <functional> #include <limits> +#include <future> +#include <tuple> namespace hdfs { @@ -32,38 +35,17 @@ static const int kNamenodeProtocolVersion = 1; using ::asio::ip::tcp; -FileSystem::~FileSystem() {} +/***************************************************************************** + * NAMENODE OPERATIONS + ****************************************************************************/ -void FileSystem::New( - IoService *io_service, const Options &options, const std::string &server, - const std::string &service, - const std::function<void(const Status &, FileSystem *)> &handler) { - FileSystemImpl *impl = new FileSystemImpl(io_service, options); - impl->Connect(server, service, [impl, handler](const Status &stat) { - if (stat.ok()) { - handler(stat, impl); - } else { - delete impl; - handler(stat, nullptr); - } - }); -} - -FileSystemImpl::FileSystemImpl(IoService *io_service, const Options &options) - : io_service_(static_cast<IoServiceImpl *>(io_service)), - engine_(&io_service_->io_service(), options, - RpcEngine::GetRandomClientName(), kNamenodeProtocol, - kNamenodeProtocolVersion), - namenode_(&engine_), - bad_node_tracker_(std::make_shared<BadDataNodeTracker>()) {} - -void FileSystemImpl::Connect(const std::string &server, +void NameNodeOperations::Connect(const std::string &server, const std::string &service, - std::function<void(const Status &)> &&handler) { - using namespace continuation; + std::function<void(const Status &)> &handler) { + using namespace asio_continuation; typedef std::vector<tcp::endpoint> State; auto m = Pipeline<State>::Create(); - m->Push(Resolve(&io_service_->io_service(), server, service, + m->Push(Resolve(io_service_, server, service, std::back_inserter(m->state()))) .Push(Bind([this, m](const Continuation::Next &next) { engine_.Connect(m->state().front(), next); @@ -76,9 +58,9 @@ void FileSystemImpl::Connect(const std::string &server, }); } -void FileSystemImpl::Open( - const std::string &path, - const std::function<void(const Status &, InputStream *)> &handler) { +void NameNodeOperations::GetBlockLocations(const std::string & path, + std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler) +{ using ::hadoop::hdfs::GetBlockLocationsRequestProto; using ::hadoop::hdfs::GetBlockLocationsResponseProto; @@ -99,10 +81,174 @@ void FileSystemImpl::Open( [this, s](const continuation::Continuation::Next &next) { namenode_.GetBlockLocations(&s->req, s->resp, next); })); + m->Run([this, handler](const Status &stat, const State &s) { - handler(stat, stat.ok() ? new InputStreamImpl(this, &s.resp->locations(), - bad_node_tracker_) + if (stat.ok()) { + auto file_info = std::make_shared<struct FileInfo>(); + auto locations = s.resp->locations(); + + file_info->file_length_ = locations.filelength(); + + for (const auto &block : locations.blocks()) { + file_info->blocks_.push_back(block); + } + + if (locations.has_lastblock() && locations.lastblock().b().numbytes()) { + file_info->blocks_.push_back(locations.lastblock()); + } + + handler(stat, file_info); + } else { + handler(stat, nullptr); + } + }); +} + + +/***************************************************************************** + * FILESYSTEM BASE CLASS + ****************************************************************************/ + +void FileSystem::New( + IoService *io_service, const Options &options, const std::string &server, + const std::string &service, + const std::function<void(const Status &, FileSystem *)> &handler) { + FileSystemImpl *impl = new FileSystemImpl(io_service, options); + impl->Connect(server, service, [impl, handler](const Status &stat) { + if (stat.ok()) { + handler(stat, impl); + } else { + delete impl; + handler(stat, nullptr); + } + }); +} + +FileSystem * FileSystem::New( + IoService *io_service, const Options &options, const std::string &server, + const std::string &service) { + auto callstate = std::make_shared<std::promise<std::tuple<Status, FileSystem *>>>(); + std::future<std::tuple<Status, FileSystem *>> future(callstate->get_future()); + + auto callback = [callstate](const Status &s, FileSystem * fs) { + callstate->set_value(std::make_tuple(s, fs)); + }; + + New(io_service, options, server, service, callback); + + /* block until promise is set */ + auto returnstate = future.get(); + + if (std::get<0>(returnstate).ok()) { + return std::get<1>(returnstate); + } else { + return nullptr; + } +} + +/***************************************************************************** + * FILESYSTEM IMPLEMENTATION + ****************************************************************************/ + +FileSystemImpl::FileSystemImpl(IoService *&io_service, const Options &options) + : io_service_(static_cast<IoServiceImpl *>(io_service)), + nn_(&io_service_->io_service(), options, + GetRandomClientName(), kNamenodeProtocol, + kNamenodeProtocolVersion), + client_name_(GetRandomClientName()) +{ + // Poor man's move + io_service = nullptr; + + /* spawn background threads for asio delegation */ + unsigned int threads = 1 /* options.io_threads_, pending HDFS-9117 */; + for (unsigned int i = 0; i < threads; i++) { + AddWorkerThread(); + } +} + +FileSystemImpl::~FileSystemImpl() { + /** + * Note: IoService must be stopped before getting rid of worker threads. + * Once worker threads are joined and deleted the service can be deleted. + **/ + io_service_->Stop(); + worker_threads_.clear(); + io_service_.reset(nullptr); +} + +void FileSystemImpl::Connect(const std::string &server, + const std::string &service, + std::function<void(const Status &)> &&handler) { + /* IoService::New can return nullptr */ + if (!io_service_) { + handler (Status::Error("Null IoService")); + } + nn_.Connect(server, service, handler); +} + +Status FileSystemImpl::Connect(const std::string &server, const std::string &service) { + /* synchronized */ + auto stat = std::make_shared<std::promise<Status>>(); + std::future<Status> future = stat->get_future(); + + auto callback = [stat](const Status &s) { + stat->set_value(s); + }; + + Connect(server, service, callback); + + /* block until promise is set */ + auto s = future.get(); + + return s; +} + + +int FileSystemImpl::AddWorkerThread() { + auto service_task = [](IoService *service) { service->Run(); }; + worker_threads_.push_back( + WorkerPtr(new std::thread(service_task, io_service_.get()))); + return worker_threads_.size(); +} + +void FileSystemImpl::Open( + const std::string &path, + const std::function<void(const Status &, FileHandle *)> &handler) { + + nn_.GetBlockLocations(path, [this, handler](const Status &stat, std::shared_ptr<const struct FileInfo> file_info) { + handler(stat, stat.ok() ? new FileHandleImpl(&io_service_->io_service(), client_name_, file_info, bad_node_tracker_) : nullptr); }); } + +Status FileSystemImpl::Open(const std::string &path, + FileHandle **handle) { + auto callstate = std::make_shared<std::promise<std::tuple<Status, FileHandle*>>>(); + std::future<std::tuple<Status, FileHandle*>> future(callstate->get_future()); + + /* wrap async FileSystem::Open with promise to make it a blocking call */ + auto h = [callstate](const Status &s, FileHandle *is) { + callstate->set_value(std::make_tuple(s, is)); + }; + + Open(path, h); + + /* block until promise is set */ + auto returnstate = future.get(); + Status stat = std::get<0>(returnstate); + FileHandle *file_handle = std::get<1>(returnstate); + + if (!stat.ok()) { + delete file_handle; + return stat; + } + if (!file_handle) { + return stat; + } + + *handle = file_handle; + return stat; +} + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h index dfe8b0c..cc8a8e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filesystem.h @@ -18,75 +18,110 @@ #ifndef LIBHDFSPP_LIB_FS_FILESYSTEM_H_ #define LIBHDFSPP_LIB_FS_FILESYSTEM_H_ +#include "filehandle.h" #include "common/hdfs_public_api.h" +#include "common/async_stream.h" #include "libhdfspp/hdfs.h" #include "fs/bad_datanode_tracker.h" #include "rpc/rpc_engine.h" +#include "reader/block_reader.h" +#include "reader/fileinfo.h" #include "ClientNamenodeProtocol.pb.h" #include "ClientNamenodeProtocol.hrpc.inl" +#include "asio.hpp" + +#include <thread> + namespace hdfs { -class FileHandle; -class HadoopFileSystem; +/** + * NameNodeConnection: abstracts the details of communicating with a NameNode + * and the implementation of the communications protocol. + * + * Will eventually handle retry and failover. + * + * Threading model: thread-safe; all operations can be called concurrently + * Lifetime: owned by a FileSystemImpl + */ +class NameNodeOperations { +public: + NameNodeOperations(::asio::io_service *io_service, const Options &options, + const std::string &client_name, const char *protocol_name, + int protocol_version) : + io_service_(io_service), + engine_(io_service, options, client_name, protocol_name, protocol_version), + namenode_(& engine_) {} + + void Connect(const std::string &server, + const std::string &service, + std::function<void(const Status &)> &handler); + + void GetBlockLocations(const std::string & path, + std::function<void(const Status &, std::shared_ptr<const struct FileInfo>)> handler); +private: + ::asio::io_service * io_service_; + RpcEngine engine_; + ClientNamenodeProtocol namenode_; +}; + +/* + * FileSystem: The consumer's main point of interaction with the cluster as + * a whole. + * + * Initially constructed in a disconnected state; call Connect before operating + * on the FileSystem. + * + * All open files must be closed before the FileSystem is destroyed. + * + * Threading model: thread-safe for all operations + * Lifetime: pointer created for consumer who is responsible for deleting it + */ class FileSystemImpl : public FileSystem { - public: - FileSystemImpl(IoService *io_service, const Options &options); +public: + FileSystemImpl(IoService *&io_service, const Options &options); + ~FileSystemImpl() override; + + /* attempt to connect to namenode, return bad status on failure */ void Connect(const std::string &server, const std::string &service, std::function<void(const Status &)> &&handler); + /* attempt to connect to namenode, return bad status on failure */ + Status Connect(const std::string &server, const std::string &service); + + virtual void Open(const std::string &path, - const std::function<void(const Status &, InputStream *)> & - handler) override; - RpcEngine &rpc_engine() { return engine_; } + const std::function<void(const Status &, FileHandle *)> + &handler) override; + Status Open(const std::string &path, FileHandle **handle) override; - private: - IoServiceImpl *io_service_; - RpcEngine engine_; - ClientNamenodeProtocol namenode_; - std::shared_ptr<BadDataNodeTracker> bad_node_tracker_; -}; -class InputStreamImpl : public InputStream { - public: - InputStreamImpl(FileSystemImpl *fs, - const ::hadoop::hdfs::LocatedBlocksProto *blocks, - std::shared_ptr<BadDataNodeTracker> tracker); - virtual void PositionRead( - void *buf, size_t nbyte, uint64_t offset, - const std::function<void(const Status &, const std::string &, size_t)> & - handler) override; - /** - * If optional_rule_override is null then use the bad_datanode_tracker. If - * non-null use the provided NodeExclusionRule to determine eligible - * datanodes. - **/ - template <class MutableBufferSequence, class Handler> - void AsyncPreadSome(size_t offset, const MutableBufferSequence &buffers, - std::shared_ptr<NodeExclusionRule> excluded_nodes, - const Handler &handler); - - template <class BlockReaderTrait, class MutableBufferSequence, class Handler> - void AsyncReadBlock(const std::string &client_name, - const hadoop::hdfs::LocatedBlockProto &block, - const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset, - const MutableBufferSequence &buffers, - const Handler &handler); - uint64_t get_file_length() const; - private: - FileSystemImpl *fs_; - unsigned long long file_length_; - std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_; - template <class Reader> - struct HandshakeContinuation; - template <class Reader, class MutableBufferSequence> - struct ReadBlockContinuation; - struct RemoteBlockReaderTrait; - friend class FileHandle; + /* add a new thread to handle asio requests, return number of threads in pool + */ + int AddWorkerThread(); + + /* how many worker threads are servicing asio requests */ + int WorkerThreadCount() { return worker_threads_.size(); } + + +private: + std::unique_ptr<IoServiceImpl> io_service_; + NameNodeOperations nn_; + const std::string client_name_; std::shared_ptr<BadDataNodeTracker> bad_node_tracker_; + + struct WorkerDeleter { + void operator()(std::thread *t) { + t->join(); + delete t; + } + }; + typedef std::unique_ptr<std::thread, WorkerDeleter> WorkerPtr; + std::vector<WorkerPtr> worker_threads_; + }; -} -#include "inputstream_impl.h" + +} #endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc deleted file mode 100644 index 0b78c93..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream.cc +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "filesystem.h" - -namespace hdfs { - -using ::hadoop::hdfs::LocatedBlocksProto; - -InputStream::~InputStream() {} - -InputStreamImpl::InputStreamImpl(FileSystemImpl *fs, - const LocatedBlocksProto *blocks, - std::shared_ptr<BadDataNodeTracker> tracker) - : fs_(fs), file_length_(blocks->filelength()), bad_node_tracker_(tracker) { - for (const auto &block : blocks->blocks()) { - blocks_.push_back(block); - } - - if (blocks->has_lastblock() && blocks->lastblock().b().numbytes()) { - blocks_.push_back(blocks->lastblock()); - } -} - -void InputStreamImpl::PositionRead( - void *buf, size_t nbyte, uint64_t offset, - const std::function<void(const Status &, const std::string &, size_t)> & - handler) { - AsyncPreadSome(offset, asio::buffer(buf, nbyte), bad_node_tracker_, handler); -} - -uint64_t InputStreamImpl::get_file_length() const { return file_length_; } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h deleted file mode 100644 index 2f64d39..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/inputstream_impl.h +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef FS_INPUTSTREAM_IMPL_H_ -#define FS_INPUTSTREAM_IMPL_H_ - -#include "reader/block_reader.h" - -#include "common/continuation/asio.h" -#include "common/continuation/protobuf.h" - -#include <functional> -#include <future> -#include <type_traits> -#include <algorithm> - -namespace hdfs { - -struct InputStreamImpl::RemoteBlockReaderTrait { - typedef RemoteBlockReader<asio::ip::tcp::socket> Reader; - struct State { - std::unique_ptr<asio::ip::tcp::socket> conn_; - std::shared_ptr<Reader> reader_; - std::array<asio::ip::tcp::endpoint, 1> endpoints_; - size_t transferred_; - Reader *reader() { return reader_.get(); } - size_t *transferred() { return &transferred_; } - const size_t *transferred() const { return &transferred_; } - }; - static continuation::Pipeline<State> *CreatePipeline( - ::asio::io_service *io_service, - const ::hadoop::hdfs::DatanodeInfoProto &dn) { - using namespace ::asio::ip; - auto m = continuation::Pipeline<State>::Create(); - auto &s = m->state(); - s.conn_.reset(new tcp::socket(*io_service)); - s.reader_ = std::make_shared<Reader>(BlockReaderOptions(), s.conn_.get()); - auto datanode = dn.id(); - s.endpoints_[0] = tcp::endpoint(address::from_string(datanode.ipaddr()), - datanode.xferport()); - - m->Push(continuation::Connect(s.conn_.get(), s.endpoints_.begin(), - s.endpoints_.end())); - return m; - } -}; - -template <class Reader> -struct InputStreamImpl::HandshakeContinuation : continuation::Continuation { - HandshakeContinuation(Reader *reader, const std::string &client_name, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, - uint64_t length, uint64_t offset) - : reader_(reader), - client_name_(client_name), - length_(length), - offset_(offset) { - if (token) { - token_.reset(new hadoop::common::TokenProto()); - token_->CheckTypeAndMergeFrom(*token); - } - block_.CheckTypeAndMergeFrom(*block); - } - - virtual void Run(const Next &next) override { - reader_->async_connect(client_name_, token_.get(), &block_, length_, - offset_, next); - } - - private: - Reader *reader_; - const std::string client_name_; - std::unique_ptr<hadoop::common::TokenProto> token_; - hadoop::hdfs::ExtendedBlockProto block_; - uint64_t length_; - uint64_t offset_; -}; - -template <class Reader, class MutableBufferSequence> -struct InputStreamImpl::ReadBlockContinuation : continuation::Continuation { - ReadBlockContinuation(Reader *reader, MutableBufferSequence buffer, - size_t *transferred) - : reader_(reader), - buffer_(buffer), - buffer_size_(asio::buffer_size(buffer)), - transferred_(transferred) { - static_assert(!std::is_reference<MutableBufferSequence>::value, - "Buffer must not be a reference type"); - } - - virtual void Run(const Next &next) override { - *transferred_ = 0; - next_ = next; - OnReadData(Status::OK(), 0); - } - - private: - Reader *reader_; - const MutableBufferSequence buffer_; - const size_t buffer_size_; - size_t *transferred_; - std::function<void(const Status &)> next_; - - void OnReadData(const Status &status, size_t transferred) { - using std::placeholders::_1; - using std::placeholders::_2; - *transferred_ += transferred; - if (!status.ok()) { - next_(status); - } else if (*transferred_ >= buffer_size_) { - next_(status); - } else { - reader_->async_read_some( - asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_), - std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2)); - } - } -}; - -template <class MutableBufferSequence, class Handler> -void InputStreamImpl::AsyncPreadSome( - size_t offset, const MutableBufferSequence &buffers, - std::shared_ptr<NodeExclusionRule> excluded_nodes, const Handler &handler) { - using ::hadoop::hdfs::DatanodeInfoProto; - using ::hadoop::hdfs::LocatedBlockProto; - - /** - * Note: block and chosen_dn will end up pointing to things inside - * the blocks_ vector. They shouldn't be directly deleted. - **/ - auto block = std::find_if( - blocks_.begin(), blocks_.end(), [offset](const LocatedBlockProto &p) { - return p.offset() <= offset && offset < p.offset() + p.b().numbytes(); - }); - - if (block == blocks_.end()) { - handler(Status::InvalidArgument("Cannot find corresponding blocks"), "", 0); - return; - } - - /** - * If user supplies a rule use it, otherwise use the tracker. - * User is responsible for making sure one of them isn't null. - **/ - std::shared_ptr<NodeExclusionRule> rule = - excluded_nodes != nullptr ? excluded_nodes : bad_node_tracker_; - - auto datanodes = block->locs(); - auto it = std::find_if(datanodes.begin(), datanodes.end(), - [rule](const DatanodeInfoProto &dn) { - return !rule->IsBadNode(dn.id().datanodeuuid()); - }); - - if (it == datanodes.end()) { - handler(Status::ResourceUnavailable("No datanodes available"), "", 0); - return; - } - - DatanodeInfoProto *chosen_dn = &*it; - - uint64_t offset_within_block = offset - block->offset(); - uint64_t size_within_block = std::min<uint64_t>( - block->b().numbytes() - offset_within_block, asio::buffer_size(buffers)); - - AsyncReadBlock<RemoteBlockReaderTrait>( - fs_->rpc_engine().client_name(), *block, *chosen_dn, offset_within_block, - asio::buffer(buffers, size_within_block), handler); -} - -template <class BlockReaderTrait, class MutableBufferSequence, class Handler> -void InputStreamImpl::AsyncReadBlock( - const std::string &client_name, - const hadoop::hdfs::LocatedBlockProto &block, - const hadoop::hdfs::DatanodeInfoProto &dn, size_t offset, - const MutableBufferSequence &buffers, const Handler &handler) { - typedef typename BlockReaderTrait::Reader Reader; - auto m = - BlockReaderTrait::CreatePipeline(&fs_->rpc_engine().io_service(), dn); - auto &s = m->state(); - size_t size = asio::buffer_size(buffers); - m->Push(new HandshakeContinuation<Reader>(s.reader(), client_name, nullptr, - &block.b(), size, offset)) - .Push(new ReadBlockContinuation<Reader, MutableBufferSequence>( - s.reader(), buffers, s.transferred())); - const std::string &dnid = dn.id().datanodeuuid(); - m->Run([handler, dnid](const Status &status, - const typename BlockReaderTrait::State &state) { - handler(status, dnid, *state.transferred()); - }); -} -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt index 71e28ac..0dcae29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt @@ -16,5 +16,5 @@ # limitations under the License. # -add_library(reader remote_block_reader.cc datatransfer.cc) +add_library(reader block_reader.cc datatransfer.cc) add_dependencies(reader proto)
