Repository: hadoop Updated Branches: refs/heads/HDFS-8707 011e79c83 -> 6df167c85
HDFS-9643. libhdfs++: Support async cancellation of read operations. Contributed by James Clampffer. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6df167c8 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6df167c8 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6df167c8 Branch: refs/heads/HDFS-8707 Commit: 6df167c851dfd8af805b7423ae8f82a120e39f29 Parents: 011e79c Author: James <[email protected]> Authored: Sat Jan 23 23:45:30 2016 -0500 Committer: James <[email protected]> Committed: Sat Jan 23 23:45:30 2016 -0500 ---------------------------------------------------------------------- .../native/libhdfspp/include/hdfspp/hdfs_ext.h | 10 +- .../native/libhdfspp/include/hdfspp/hdfspp.h | 6 + .../native/libhdfspp/include/hdfspp/status.h | 3 + .../native/libhdfspp/lib/bindings/c/hdfs.cc | 7 + .../native/libhdfspp/lib/common/CMakeLists.txt | 2 +- .../libhdfspp/lib/common/cancel_tracker.cc | 37 ++++++ .../libhdfspp/lib/common/cancel_tracker.h | 40 ++++++ .../lib/common/continuation/continuation.h | 15 ++- .../lib/connection/datanodeconnection.cc | 6 + .../lib/connection/datanodeconnection.h | 3 + .../main/native/libhdfspp/lib/fs/filehandle.cc | 35 ++++- .../main/native/libhdfspp/lib/fs/filehandle.h | 12 ++ .../native/libhdfspp/lib/reader/CMakeLists.txt | 2 +- .../native/libhdfspp/lib/reader/block_reader.cc | 14 +- .../native/libhdfspp/lib/reader/block_reader.h | 11 +- .../native/libhdfspp/lib/reader/datatransfer.h | 2 + .../libhdfspp/lib/reader/datatransfer_impl.h | 5 + .../native/libhdfspp/lib/reader/readergroup.cc | 55 ++++++++ .../native/libhdfspp/lib/reader/readergroup.h | 52 ++++++++ .../native/libhdfspp/tests/bad_datanode_test.cc | 8 ++ .../libhdfspp/tests/remote_block_reader_test.cc | 128 ++++++++++++++++++- 21 files changed, 433 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h index 8f4548d..3017fe1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfs_ext.h @@ -64,6 +64,15 @@ void hdfsGetLastError(char *buf, int len); /** + * Cancels operations being made by the FileHandle. + * Note: Cancel cannot be reversed. This is intended + * to be used before hdfsClose to avoid waiting for + * operations to complete. + **/ +LIBHDFS_EXTERNAL +int hdfsCancel(hdfsFS fs, hdfsFile file); + +/** * Create an HDFS builder, using the configuration XML files from the indicated * directory. If the directory does not exist, or contains no configuration * XML files, a Builder using all default values will be returned. @@ -99,6 +108,5 @@ int hdfsBuilderConfGetStr(struct hdfsBuilder *bld, const char *key, */ int hdfsBuilderConfGetInt(struct hdfsBuilder *bld, const char *key, int32_t *val); - } /* end extern "C" */ #endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h index effdecb..2cbb62c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/hdfspp.h @@ -93,6 +93,12 @@ public: virtual Status Seek(off_t *offset, std::ios_base::seekdir whence) = 0; /** + * Cancel outstanding file operations. This is not reversable, once called + * the handle should be disposed of. + **/ + virtual void CancelOperations(void) = 0; + + /** * Determine if a datanode should be excluded from future operations * based on the return Status. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h index 6b58799..89be771 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/include/hdfspp/status.h @@ -47,6 +47,8 @@ class Status { { return Status(kException, expception_class_name, error_message); } static Status Error(const char *error_message) { return Exception("Exception", error_message); } + static Status Canceled() + { return Status(kOperationCanceled,""); } // Returns true iff the status indicates success. bool ok() const { return (state_ == NULL); } @@ -64,6 +66,7 @@ class Status { kInvalidArgument = static_cast<unsigned>(std::errc::invalid_argument), kResourceUnavailable = static_cast<unsigned>(std::errc::resource_unavailable_try_again), kUnimplemented = static_cast<unsigned>(std::errc::function_not_supported), + kOperationCanceled = static_cast<unsigned>(std::errc::operation_canceled), kException = 255, }; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc index 7798680..d5b5d6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/bindings/c/hdfs.cc @@ -267,6 +267,13 @@ tOffset hdfsTell(hdfsFS fs, hdfsFile file) { return offset; } +int hdfsCancel(hdfsFS fs, hdfsFile file) { + if (!CheckSystemAndHandle(fs, file)) { + return -1; + } + static_cast<FileHandleImpl*>(file->get_impl())->CancelOperations(); + return 0; +} /******************************************************************* * BUILDER INTERFACE http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt index c02e3db..0344a7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/CMakeLists.txt @@ -19,6 +19,6 @@ if(NEED_LINK_DL) set(LIB_DL dl) endif() -add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc util.cc retry_policy.cc) +add_library(common_obj OBJECT base64.cc status.cc sasl_digest_md5.cc hdfs_public_api.cc options.cc configuration.cc configuration_loader.cc hdfs_configuration.cc util.cc retry_policy.cc cancel_tracker.cc) add_library(common $<TARGET_OBJECTS:common_obj>) target_link_libraries(common ${LIB_DL}) http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.cc new file mode 100644 index 0000000..0f60ed7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.cc @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#include "cancel_tracker.h" + +namespace hdfs { + +CancelTracker::CancelTracker() : canceled_(false) {} + +std::shared_ptr<CancelTracker> CancelTracker::New() { + return std::make_shared<CancelTracker>(); +} + +bool CancelTracker::is_canceled() { + return canceled_; +} + +void CancelTracker::set_canceled() { + canceled_ = true; +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.h new file mode 100644 index 0000000..ba61926 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/cancel_tracker.h @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +#ifndef COMMON_CANCELTRACKER_H +#define COMMON_CANCELTRACKER_H + +#include <memory> +#include <atomic> + +namespace hdfs { + +class CancelTracker : public std::enable_shared_from_this<CancelTracker> { + public: + CancelTracker(); + static std::shared_ptr<CancelTracker> New(); + void set_canceled(); + bool is_canceled(); + private: + std::atomic_bool canceled_; +}; + +typedef std::shared_ptr<CancelTracker> CancelHandle; + +} +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h index 58a1b46..4c9b8ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/common/continuation/continuation.h @@ -19,6 +19,7 @@ #define LIB_COMMON_CONTINUATION_CONTINUATION_H_ #include "hdfspp/status.h" +#include "common/cancel_tracker.h" #include <functional> #include <memory> @@ -81,6 +82,9 @@ template <class State> class Pipeline { public: typedef std::function<void(const Status &, const State &)> UserHandler; static Pipeline *Create() { return new Pipeline(); } + static Pipeline *Create(CancelHandle cancel_handle) { + return new Pipeline(cancel_handle); + } Pipeline &Push(Continuation *stage); void Run(UserHandler &&handler); State &state() { return state_; } @@ -91,9 +95,11 @@ private: size_t stage_; std::function<void(const Status &, const State &)> handler_; - Pipeline() : stage_(0) {} + Pipeline() : stage_(0), cancel_handle_(CancelTracker::New()) {} + Pipeline(CancelHandle cancel_handle) : stage_(0), cancel_handle_(cancel_handle) {} ~Pipeline() = default; void Schedule(const Status &status); + CancelHandle cancel_handle_; }; template <class State> @@ -104,7 +110,12 @@ inline Pipeline<State> &Pipeline<State>::Push(Continuation *stage) { template <class State> inline void Pipeline<State>::Schedule(const Status &status) { - if (!status.ok() || stage_ >= routines_.size()) { + // catch cancelation signalled from outside of pipeline + if(cancel_handle_->is_canceled()) { + handler_(Status::Canceled(), state_); + routines_.clear(); + delete this; + } else if (!status.ok() || stage_ >= routines_.size()) { handler_(status, state_); routines_.clear(); delete this; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc index b053e7f..247c75e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.cc @@ -53,5 +53,11 @@ void DataNodeConnectionImpl::Connect( handler(ToStatus(ec), shared_this); }); } +void DataNodeConnectionImpl::Cancel() { + // best to do a shutdown() first for portability + conn_->shutdown(asio::ip::tcp::socket::shutdown_both); + conn_->close(); +} + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h index d5bbb92..8f64110 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/connection/datanodeconnection.h @@ -33,6 +33,7 @@ public: virtual ~DataNodeConnection(); virtual void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) = 0; + virtual void Cancel() = 0; }; @@ -48,6 +49,8 @@ public: void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override; + void Cancel() override; + void async_read_some(const MutableBuffers &buf, std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) override { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc index 758fe5c..de9ccca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.cc @@ -34,13 +34,17 @@ FileHandleImpl::FileHandleImpl(::asio::io_service *io_service, const std::string const std::shared_ptr<const struct FileInfo> file_info, std::shared_ptr<BadDataNodeTracker> bad_data_nodes) : io_service_(io_service), client_name_(client_name), file_info_(file_info), - bad_node_tracker_(bad_data_nodes), offset_(0) { + bad_node_tracker_(bad_data_nodes), offset_(0), cancel_state_(CancelTracker::New()) { } void FileHandleImpl::PositionRead( void *buf, size_t nbyte, uint64_t offset, - const std::function<void(const Status &, size_t)> - &handler) { + const std::function<void(const Status &, size_t)> &handler) { + /* prevent usage after cancelation */ + if(cancel_state_->is_canceled()) { + handler(Status::Canceled(), 0); + return; + } auto callback = [this, handler](const Status &status, const std::string &contacted_datanode, @@ -90,6 +94,10 @@ Status FileHandleImpl::Read(void *buf, size_t *nbyte) { } Status FileHandleImpl::Seek(off_t *offset, std::ios_base::seekdir whence) { + if(cancel_state_->is_canceled()) { + return Status::Canceled(); + } + off_t new_offset = -1; switch (whence) { @@ -138,6 +146,11 @@ void FileHandleImpl::AsyncPreadSome( using ::hadoop::hdfs::DatanodeInfoProto; using ::hadoop::hdfs::LocatedBlockProto; + if(cancel_state_->is_canceled()) { + handler(Status::Canceled(), "", 0); + return; + } + /** * Note: block and chosen_dn will end up pointing to things inside * the blocks_ vector. They shouldn't be directly deleted. @@ -210,7 +223,9 @@ void FileHandleImpl::AsyncPreadSome( std::shared_ptr<BlockReader> FileHandleImpl::CreateBlockReader(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn) { - return std::make_shared<BlockReaderImpl>(options, dn); + std::shared_ptr<BlockReader> reader = std::make_shared<BlockReaderImpl>(options, dn, cancel_state_); + readers_.AddReader(reader); + return reader; } std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection( @@ -220,6 +235,17 @@ std::shared_ptr<DataNodeConnection> FileHandleImpl::CreateDataNodeConnection( return std::make_shared<DataNodeConnectionImpl>(io_service, dn, token); } +void FileHandleImpl::CancelOperations() { + cancel_state_->set_canceled(); + + /* Push update to BlockReaders that may be hung in an asio call */ + std::vector<std::shared_ptr<BlockReader>> live_readers = readers_.GetLiveReaders(); + for(auto reader : live_readers) { + reader->CancelOperation(); + } +} + + bool FileHandle::ShouldExclude(const Status &s) { if (s.ok()) { return false; @@ -228,6 +254,7 @@ bool FileHandle::ShouldExclude(const Status &s) { switch (s.code()) { /* client side resource exhaustion */ case Status::kResourceUnavailable: + case Status::kOperationCanceled: return false; case Status::kInvalidArgument: case Status::kUnimplemented: http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h index 95b5869..8c03b37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/fs/filehandle.h @@ -20,7 +20,9 @@ #include "common/hdfs_public_api.h" #include "common/async_stream.h" +#include "common/cancel_tracker.h" #include "reader/fileinfo.h" +#include "reader/readergroup.h" #include "asio.hpp" #include "bad_datanode_tracker.h" @@ -94,6 +96,14 @@ public: const std::function<void(const Status &status, const std::string &dn_id, size_t bytes_read)> handler); + + /** + * Cancels all operations instantiated from this FileHandle. + * Will set a flag to abort continuation pipelines when they try to move to the next step. + * Closes TCP connections to Datanode in order to abort pipelines waiting on slow IO. + **/ + virtual void CancelOperations(void) override; + protected: virtual std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn); @@ -108,6 +118,8 @@ private: std::shared_ptr<BadDataNodeTracker> bad_node_tracker_; bool CheckSeekBounds(ssize_t desired_position); off_t offset_; + CancelHandle cancel_state_; + ReaderGroup readers_; }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt index d4bb837..2bcfd92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt @@ -16,6 +16,6 @@ # limitations under the License. # -add_library(reader_obj OBJECT block_reader.cc datatransfer.cc) +add_library(reader_obj OBJECT block_reader.cc datatransfer.cc readergroup.cc) add_dependencies(reader_obj proto) add_library(reader $<TARGET_OBJECTS:reader_obj>) http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc index a4e21de..594aaf5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc @@ -22,6 +22,7 @@ #include <future> + namespace hdfs { hadoop::hdfs::OpReadBlockProto @@ -65,7 +66,7 @@ void BlockReaderImpl::AsyncRequestBlock( hadoop::hdfs::BlockOpResponseProto response; }; - auto m = continuation::Pipeline<State>::Create(); + auto m = continuation::Pipeline<State>::Create(cancel_state_); State *s = &m->state(); s->header.insert(s->header.begin(), @@ -287,7 +288,7 @@ struct BlockReaderImpl::AckRead : continuation::Continuation { } auto m = - continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(); + continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(parent_->cancel_state_); m->state().set_status(parent_->options_.verify_checksum ? hadoop::hdfs::Status::CHECKSUM_OK : hadoop::hdfs::Status::SUCCESS); @@ -316,7 +317,7 @@ void BlockReaderImpl::AsyncReadPacket( struct State { std::shared_ptr<size_t> bytes_transferred; }; - auto m = continuation::Pipeline<State>::Create(); + auto m = continuation::Pipeline<State>::Create(cancel_state_); m->state().bytes_transferred = std::make_shared<size_t>(0); m->Push(new ReadPacketHeader(this)) @@ -415,7 +416,7 @@ void BlockReaderImpl::AsyncReadBlock( const MutableBuffers &buffers, const std::function<void(const Status &, size_t)> handler) { - auto m = continuation::Pipeline<size_t>::Create(); + auto m = continuation::Pipeline<size_t>::Create(cancel_state_); size_t * bytesTransferred = &m->state(); size_t size = asio::buffer_size(buffers); @@ -430,4 +431,9 @@ void BlockReaderImpl::AsyncReadBlock( }); } +void BlockReaderImpl::CancelOperation() { + /* just forward cancel to DNConnection */ + dn_->Cancel(); +} + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h index 7984c7e..3d5ece4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h @@ -20,6 +20,7 @@ #include "hdfspp/status.h" #include "common/async_stream.h" +#include "common/cancel_tracker.h" #include "datatransfer.pb.h" #include "connection/datanodeconnection.h" @@ -82,14 +83,17 @@ public: uint64_t length, uint64_t offset, const std::function<void(Status)> &handler) = 0; + + virtual void CancelOperation() = 0; }; class BlockReaderImpl : public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> { public: - explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn) + explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn, + CancelHandle cancel_state) : dn_(dn), state_(kOpen), options_(options), - chunk_padding_bytes_(0) {} + chunk_padding_bytes_(0), cancel_state_(cancel_state) {} virtual void AsyncReadPacket( const MutableBuffers &buffers, @@ -108,6 +112,8 @@ public: const MutableBuffers &buffers, const std::function<void(const Status &, size_t)> handler) override; + virtual void CancelOperation() override; + size_t ReadPacket(const MutableBuffers &buffers, Status *status); Status RequestBlock( @@ -143,6 +149,7 @@ private: int chunk_padding_bytes_; long long bytes_to_read_; std::vector<char> checksum_; + CancelHandle cancel_state_; }; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h index 8be9ef8..93103c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h @@ -58,6 +58,8 @@ public: void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override {(void)handler; /*TODO: Handshaking goes here*/}; + + void Cancel(); private: DataTransferSaslStream(const DataTransferSaslStream &) = delete; DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h index 3ca16e9..e2cd790 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h @@ -126,6 +126,11 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) { m->Run([next](const Status &status, const State &) { next(status); }); } +template <class Stream> +void DataTransferSaslStream<Stream>::Cancel() { + /* implement with secured reads */ +} + } #endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc new file mode 100644 index 0000000..a64800a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.cc @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "readergroup.h" + +#include <algorithm> + +namespace hdfs { + +void ReaderGroup::AddReader(std::shared_ptr<BlockReader> reader) { + std::lock_guard<std::recursive_mutex> state_lock(state_lock_); + ClearDeadReaders(); + std::weak_ptr<BlockReader> weak_ref = reader; + readers_.push_back(weak_ref); +} + +std::vector<std::shared_ptr<BlockReader>> ReaderGroup::GetLiveReaders() { + std::lock_guard<std::recursive_mutex> state_lock(state_lock_); + + std::vector<std::shared_ptr<BlockReader>> live_readers; + for(auto it=readers_.begin(); it != readers_.end(); it++) { + std::shared_ptr<BlockReader> live_reader = it->lock(); + if(live_reader) { + live_readers.push_back(live_reader); + } + } + return live_readers; +} + +void ReaderGroup::ClearDeadReaders() { + std::lock_guard<std::recursive_mutex> state_lock(state_lock_); + + auto reader_is_dead = [](const std::weak_ptr<BlockReader> &ptr) { + return ptr.expired(); + }; + + auto it = std::remove_if(readers_.begin(), readers_.end(), reader_is_dead); + readers_.erase(it, readers_.end()); +} + +} // end namespace hdfs http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h new file mode 100644 index 0000000..e6173f7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/readergroup.h @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef READER_READER_GROUP_H_ +#define READER_READER_GROUP_H_ + +#include "block_reader.h" + +#include <memory> +#include <vector> +#include <mutex> + +namespace hdfs { + +/** + * Provide a way of logically grouping ephemeral block readers + * so that their status can be monitored or changed. + * + * Note: This does not attempt to extend the reader life + * cycle. Readers are assumed to be owned by something else + * using a shared_ptr. + **/ + +class ReaderGroup { + public: + ReaderGroup() {}; + void AddReader(std::shared_ptr<BlockReader> reader); + /* find live readers, promote to shared_ptr */ + std::vector<std::shared_ptr<BlockReader>> GetLiveReaders(); + private: + /* remove weak_ptrs that don't point to live object */ + void ClearDeadReaders(); + std::recursive_mutex state_lock_; + std::vector<std::weak_ptr<BlockReader>> readers_; +}; + +} // end namespace hdfs +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc index 771d85f..4741817 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc @@ -55,6 +55,10 @@ public: size_t offset, const MutableBuffers &buffers, const std::function<void(const Status &, size_t)> handler)); + + virtual void CancelOperation() override { + /* no-op, declared pure virtual */ + } }; class MockDNConnection : public DataNodeConnection, public std::enable_shared_from_this<MockDNConnection> { @@ -75,6 +79,10 @@ class MockDNConnection : public DataNodeConnection, public std::enable_shared_fr (void)buf; handler(asio::error::fault, 0); } + + virtual void Cancel() override { + /* no-op, declared pure virtual */ + } }; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6df167c8/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc index 7d926ba..80127f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc @@ -20,6 +20,7 @@ #include "datatransfer.pb.h" #include "common/util.h" +#include "common/cancel_tracker.h" #include "reader/block_reader.h" #include "reader/datatransfer.h" #include "reader/fileinfo.h" @@ -29,6 +30,8 @@ #include <gmock/gmock.h> #include <gtest/gtest.h> +#include <iostream> + using namespace hdfs; using ::hadoop::common::TokenProto; @@ -58,14 +61,18 @@ namespace hdfs { class MockDNConnection : public MockConnectionBase, public DataNodeConnection{ public: MockDNConnection(::asio::io_service &io_service) - : MockConnectionBase(&io_service) {} + : MockConnectionBase(&io_service), OnRead([](){}) {} MOCK_METHOD0(Produce, ProducerResult()); MOCK_METHOD1(Connect, void(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)>)); + /* event handler to trigger side effects */ + std::function<void(void)> OnRead; + void async_read_some(const MutableBuffers &buf, std::function<void (const asio::error_code & error, std::size_t bytes_transferred) > handler) override { + this->OnRead(); this->MockConnectionBase::async_read_some(buf, handler); } @@ -74,6 +81,10 @@ public: std::size_t bytes_transferred) > handler) override { this->MockConnectionBase::async_write_some(buf, handler); } + + void Cancel() { + /* no-op, declared pure virtual */ + } }; // Mocks AsyncReadPacket and AsyncRequestBlock but not AsyncReadBlock, so we @@ -81,7 +92,7 @@ public: class PartialMockReader : public BlockReaderImpl { public: PartialMockReader() : - BlockReaderImpl(BlockReaderOptions(), std::shared_ptr<DataNodeConnection>()) {}; + BlockReaderImpl(BlockReaderOptions(), std::shared_ptr<DataNodeConnection>(), CancelTracker::New()) {}; MOCK_METHOD2( AsyncReadPacket, @@ -221,9 +232,9 @@ template <class Stream = MockDNConnection, class Handler> static std::shared_ptr<BlockReaderImpl> ReadContent(std::shared_ptr<Stream> conn, const ExtendedBlockProto &block, uint64_t length, uint64_t offset, const mutable_buffers_1 &buf, - const Handler &handler) { + const Handler &handler, CancelHandle cancel_handle = CancelTracker::New()) { BlockReaderOptions options; - auto reader = std::make_shared<BlockReaderImpl>(options, conn); + auto reader = std::make_shared<BlockReaderImpl>(options, conn, cancel_handle); Status result; reader->AsyncRequestBlock("libhdfs++", &block, length, offset, [buf, reader, handler](const Status &stat) { @@ -268,6 +279,59 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) { ASSERT_TRUE(done); } +/* used for cancelation tests, global to avoid cluttering capture lists */ +CancelHandle packet_canceller; + +TEST(RemoteBlockReaderTest, TestCancelWhileReceiving) { + packet_canceller = CancelTracker::New(); + + static const size_t kChunkSize = 512; + static const string kChunkData(kChunkSize, 'a'); + ::asio::io_service io_service; + auto conn = std::make_shared<MockDNConnection>(io_service); + BlockOpResponseProto block_op_resp; + + /** + * async_read would normally get called 5 times here; once for each + * continuation in the pipeline. Cancel will be triggered on the + * fourth call to catch the pipeline mid-execution. + **/ + int call_count = 0; + int trigger_at_count = 4; + auto cancel_trigger = [&call_count, &trigger_at_count]() { + call_count += 1; + std::cout << "read called " << call_count << " times" << std::endl; + if(call_count == trigger_at_count) + packet_canceller->set_canceled(); + }; + + conn->OnRead = cancel_trigger; + + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + EXPECT_CALL(*conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); + + ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + + bool done = false; + std::string data(kChunkSize, 0); + ReadContent(conn, block, kChunkSize, 0, + buffer(const_cast<char *>(data.c_str()), data.size()), + [&data, &io_service, &done](const Status &stat, size_t transferred) { + ASSERT_EQ(stat.code(), Status::kOperationCanceled); + ASSERT_EQ(0, transferred); + done = true; + io_service.stop(); + }, packet_canceller); + + io_service.run(); + ASSERT_TRUE(done); +} + TEST(RemoteBlockReaderTest, TestReadWithinChunk) { static const size_t kChunkSize = 1024; static const size_t kLength = kChunkSize / 4 * 3; @@ -332,7 +396,7 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { string data(kChunkSize, 0); mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size()); BlockReaderOptions options; - auto reader = std::make_shared<BlockReaderImpl>(options, conn); + auto reader = std::make_shared<BlockReaderImpl>(options, conn, CancelTracker::New()); Status result; reader->AsyncRequestBlock( "libhdfs++", &block, data.size(), 0, @@ -358,6 +422,60 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { io_service.run(); } +TEST(RemoteBlockReaderTest, TestReadCancelBetweenPackets) { + packet_canceller = CancelTracker::New(); + + static const size_t kChunkSize = 1024; + static const string kChunkData(kChunkSize, 'a'); + + ::asio::io_service io_service; + auto conn = std::make_shared<MockDNConnection>(io_service); + BlockOpResponseProto block_op_resp; + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + + EXPECT_CALL(*conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false))); + /* the second AsyncReadPacket should never attempt to read */ + + ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + + string data(kChunkSize, 0); + mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size()); + BlockReaderOptions options; + auto reader = std::make_shared<BlockReaderImpl>(options, conn, packet_canceller); + Status result; + reader->AsyncRequestBlock( + "libhdfs++", &block, data.size(), 0, + [buf, reader, &data, &io_service](const Status &stat) { + ASSERT_TRUE(stat.ok()); + reader->AsyncReadPacket( + buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + data.clear(); + data.resize(kChunkSize); + transferred = 0; + + /* Cancel the operation.*/ + packet_canceller->set_canceled(); + + reader->AsyncReadPacket( + buf, [&data,&io_service](const Status &stat, size_t transferred) { + ASSERT_EQ(stat.code(), Status::kOperationCanceled); + ASSERT_EQ(0, transferred); + io_service.stop(); + }); + }); + }); + io_service.run(); +} + + TEST(RemoteBlockReaderTest, TestSaslConnection) { static const size_t kChunkSize = 512; static const string kChunkData(kChunkSize, 'a');
