Repository: hadoop Updated Branches: refs/heads/HDFS-8707 d6d056d3b -> a06bc8e12
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc new file mode 100644 index 0000000..a4e21de --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.cc @@ -0,0 +1,433 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "reader/block_reader.h" +#include "reader/datatransfer.h" +#include "common/continuation/continuation.h" +#include "common/continuation/asio.h" + +#include <future> + +namespace hdfs { + +hadoop::hdfs::OpReadBlockProto +ReadBlockProto(const std::string &client_name, bool verify_checksum, + const hadoop::common::TokenProto *token, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, + uint64_t offset) { + using namespace hadoop::hdfs; + using namespace hadoop::common; + BaseHeaderProto *base_h = new BaseHeaderProto(); + base_h->set_allocated_block(new ExtendedBlockProto(*block)); + if (token) { + base_h->set_allocated_token(new TokenProto(*token)); + } + ClientOperationHeaderProto *h = new ClientOperationHeaderProto(); + h->set_clientname(client_name); + h->set_allocated_baseheader(base_h); + + OpReadBlockProto p; + p.set_allocated_header(h); + p.set_offset(offset); + p.set_len(length); + p.set_sendchecksums(verify_checksum); + // TODO: p.set_allocated_cachingstrategy(); + return p; +} + +void BlockReaderImpl::AsyncRequestBlock( + const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, + uint64_t offset, const std::function<void(Status)> &handler) { + // The total number of bytes that we need to transfer from the DN is + // the amount that the user wants (bytesToRead), plus the padding at + // the beginning in order to chunk-align. Note that the DN may elect + // to send more than this amount if the read starts/ends mid-chunk. + bytes_to_read_ = length; + + struct State { + std::string header; + hadoop::hdfs::OpReadBlockProto request; + hadoop::hdfs::BlockOpResponseProto response; + }; + + auto m = continuation::Pipeline<State>::Create(); + State *s = &m->state(); + + s->header.insert(s->header.begin(), + {0, kDataTransferVersion, Operation::kReadBlock}); + s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum, + dn_->token_.get(), block, length, offset)); + + auto read_pb_message = + new continuation::ReadDelimitedPBMessageContinuation<AsyncStream, 16384>( + dn_, &s->response); + + m->Push(asio_continuation::Write(dn_.get(), asio::buffer(s->header))) + .Push(asio_continuation::WriteDelimitedPBMessage(dn_, &s->request)) + .Push(read_pb_message); + + m->Run([this, handler, offset](const Status &status, const State &s) { Status stat = status; + if (stat.ok()) { + const auto &resp = s.response; + if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) { + if (resp.has_readopchecksuminfo()) { + const auto &checksum_info = resp.readopchecksuminfo(); + chunk_padding_bytes_ = offset - checksum_info.chunkoffset(); + } + state_ = kReadPacketHeader; + } else { + stat = Status::Error(s.response.message().c_str()); + } + } + handler(stat); + }); +} + +Status BlockReaderImpl::RequestBlock( + const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, + uint64_t offset) { + auto stat = std::make_shared<std::promise<Status>>(); + std::future<Status> future(stat->get_future()); + AsyncRequestBlock(client_name, block, length, offset, + [stat](const Status &status) { stat->set_value(status); }); + return future.get(); +} + +hadoop::hdfs::OpReadBlockProto +ReadBlockProto(const std::string &client_name, bool verify_checksum, + const hadoop::common::TokenProto *token, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, + uint64_t offset); + +struct BlockReaderImpl::ReadPacketHeader + : continuation::Continuation { + ReadPacketHeader(BlockReaderImpl *parent) : parent_(parent) {} + + virtual void Run(const Next &next) override { + parent_->packet_data_read_bytes_ = 0; + parent_->packet_len_ = 0; + auto handler = [next, this](const asio::error_code &ec, size_t) { + Status status; + if (ec) { + status = Status(ec.value(), ec.message().c_str()); + } else { + parent_->packet_len_ = packet_length(); + parent_->header_.Clear(); + bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart], + header_length()); + assert(v && "Failed to parse the header"); + parent_->state_ = kReadChecksum; + } + next(status); + }; + + asio::async_read(*parent_->dn_, asio::buffer(buf_), + std::bind(&ReadPacketHeader::CompletionHandler, this, + std::placeholders::_1, std::placeholders::_2), + handler); + } + +private: + static const size_t kMaxHeaderSize = 512; + static const size_t kPayloadLenOffset = 0; + static const size_t kPayloadLenSize = sizeof(int32_t); + static const size_t kHeaderLenOffset = 4; + static const size_t kHeaderLenSize = sizeof(int16_t); + static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize; + + BlockReaderImpl *parent_; + std::array<char, kMaxHeaderSize> buf_; + + size_t packet_length() const { + return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset])); + } + + size_t header_length() const { + return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset])); + } + + size_t CompletionHandler(const asio::error_code &ec, size_t transferred) { + if (ec) { + return 0; + } else if (transferred < kHeaderStart) { + return kHeaderStart - transferred; + } else { + return kHeaderStart + header_length() - transferred; + } + } +}; + +struct BlockReaderImpl::ReadChecksum : continuation::Continuation { + ReadChecksum(BlockReaderImpl *parent) : parent_(parent) {} + + virtual void Run(const Next &next) override { + auto parent = parent_; + if (parent->state_ != kReadChecksum) { + next(Status::OK()); + return; + } + + auto handler = [parent, next](const asio::error_code &ec, size_t) { + Status status; + if (ec) { + status = Status(ec.value(), ec.message().c_str()); + } else { + parent->state_ = + parent->chunk_padding_bytes_ ? kReadPadding : kReadData; + } + next(status); + }; + parent->checksum_.resize(parent->packet_len_ - sizeof(int) - + parent->header_.datalen()); + asio::async_read(*parent->dn_, asio::buffer(parent->checksum_), handler); + } + +private: + BlockReaderImpl *parent_; +}; + +struct BlockReaderImpl::ReadData : continuation::Continuation { + ReadData(BlockReaderImpl *parent, + std::shared_ptr<size_t> bytes_transferred, + const asio::mutable_buffers_1 &buf) + : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) { + buf_.begin(); + } + + ~ReadData() { + buf_.end(); + } + + virtual void Run(const Next &next) override { + auto handler = + [next, this](const asio::error_code &ec, size_t transferred) { + Status status; + if (ec) { + status = Status(ec.value(), ec.message().c_str()); + } + *bytes_transferred_ += transferred; + parent_->bytes_to_read_ -= transferred; + parent_->packet_data_read_bytes_ += transferred; + if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) { + parent_->state_ = kReadPacketHeader; + } + next(status); + }; + + auto data_len = + parent_->header_.datalen() - parent_->packet_data_read_bytes_; + asio::async_read(*parent_->dn_, buf_, asio::transfer_exactly(data_len), + handler); + } + +private: + BlockReaderImpl *parent_; + std::shared_ptr<size_t> bytes_transferred_; + const asio::mutable_buffers_1 buf_; +}; + +struct BlockReaderImpl::ReadPadding : continuation::Continuation { + ReadPadding(BlockReaderImpl *parent) + : parent_(parent), padding_(parent->chunk_padding_bytes_), + bytes_transferred_(std::make_shared<size_t>(0)), + read_data_(new ReadData( + parent, bytes_transferred_, asio::buffer(padding_))) {} + + virtual void Run(const Next &next) override { + if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) { + next(Status::OK()); + return; + } + + auto h = [next, this](const Status &status) { + if (status.ok()) { + assert(reinterpret_cast<const int &>(*bytes_transferred_) == + parent_->chunk_padding_bytes_); + parent_->chunk_padding_bytes_ = 0; + parent_->state_ = kReadData; + } + next(status); + }; + read_data_->Run(h); + } + +private: + BlockReaderImpl *parent_; + std::vector<char> padding_; + std::shared_ptr<size_t> bytes_transferred_; + std::shared_ptr<continuation::Continuation> read_data_; + ReadPadding(const ReadPadding &) = delete; + ReadPadding &operator=(const ReadPadding &) = delete; +}; + + +struct BlockReaderImpl::AckRead : continuation::Continuation { + AckRead(BlockReaderImpl *parent) : parent_(parent) {} + + virtual void Run(const Next &next) override { + if (parent_->bytes_to_read_ > 0) { + next(Status::OK()); + return; + } + + auto m = + continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(); + m->state().set_status(parent_->options_.verify_checksum + ? hadoop::hdfs::Status::CHECKSUM_OK + : hadoop::hdfs::Status::SUCCESS); + + m->Push( + continuation::WriteDelimitedPBMessage(parent_->dn_, &m->state())); + + m->Run([this, next](const Status &status, + const hadoop::hdfs::ClientReadStatusProto &) { + if (status.ok()) { + parent_->state_ = BlockReaderImpl::kFinished; + } + next(status); + }); + } + +private: + BlockReaderImpl *parent_; +}; + +void BlockReaderImpl::AsyncReadPacket( + const MutableBuffers &buffers, + const std::function<void(const Status &, size_t bytes_transferred)> &handler) { + assert(state_ != kOpen && "Not connected"); + + struct State { + std::shared_ptr<size_t> bytes_transferred; + }; + auto m = continuation::Pipeline<State>::Create(); + m->state().bytes_transferred = std::make_shared<size_t>(0); + + m->Push(new ReadPacketHeader(this)) + .Push(new ReadChecksum(this)) + .Push(new ReadPadding(this)) + .Push(new ReadData( + this, m->state().bytes_transferred, buffers)) + .Push(new AckRead(this)); + + auto self = this->shared_from_this(); + m->Run([self, handler](const Status &status, const State &state) { + handler(status, *state.bytes_transferred); + }); +} + + +size_t +BlockReaderImpl::ReadPacket(const MutableBuffers &buffers, + Status *status) { + size_t transferred = 0; + auto done = std::make_shared<std::promise<void>>(); + auto future = done->get_future(); + AsyncReadPacket(buffers, + [status, &transferred, done](const Status &stat, size_t t) { + *status = stat; + transferred = t; + done->set_value(); + }); + future.wait(); + return transferred; +} + + +struct BlockReaderImpl::RequestBlockContinuation : continuation::Continuation { + RequestBlockContinuation(BlockReader *reader, const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, uint64_t offset) + : reader_(reader), client_name_(client_name), length_(length), + offset_(offset) { + block_.CheckTypeAndMergeFrom(*block); + } + + virtual void Run(const Next &next) override { + reader_->AsyncRequestBlock(client_name_, &block_, length_, + offset_, next); + } + +private: + BlockReader *reader_; + const std::string client_name_; + hadoop::hdfs::ExtendedBlockProto block_; + uint64_t length_; + uint64_t offset_; +}; + +struct BlockReaderImpl::ReadBlockContinuation : continuation::Continuation { + ReadBlockContinuation(BlockReader *reader, MutableBuffers buffer, + size_t *transferred) + : reader_(reader), buffer_(buffer), + buffer_size_(asio::buffer_size(buffer)), transferred_(transferred) { + } + + virtual void Run(const Next &next) override { + *transferred_ = 0; + next_ = next; + OnReadData(Status::OK(), 0); + } + +private: + BlockReader *reader_; + const MutableBuffers buffer_; + const size_t buffer_size_; + size_t *transferred_; + std::function<void(const Status &)> next_; + + void OnReadData(const Status &status, size_t transferred) { + using std::placeholders::_1; + using std::placeholders::_2; + *transferred_ += transferred; + if (!status.ok()) { + next_(status); + } else if (*transferred_ >= buffer_size_) { + next_(status); + } else { + reader_->AsyncReadPacket( + asio::buffer(buffer_ + *transferred_, buffer_size_ - *transferred_), + std::bind(&ReadBlockContinuation::OnReadData, this, _1, _2)); + } + } +}; + +void BlockReaderImpl::AsyncReadBlock( + const std::string & client_name, + const hadoop::hdfs::LocatedBlockProto &block, + size_t offset, + const MutableBuffers &buffers, + const std::function<void(const Status &, size_t)> handler) { + + auto m = continuation::Pipeline<size_t>::Create(); + size_t * bytesTransferred = &m->state(); + + size_t size = asio::buffer_size(buffers); + + m->Push(new RequestBlockContinuation(this, client_name, + &block.b(), size, offset)) + .Push(new ReadBlockContinuation(this, buffers, bytesTransferred)); + + m->Run([handler] (const Status &status, + const size_t totalBytesTransferred) { + handler(status, totalBytesTransferred); + }); +} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h index 81636b9..140286b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h @@ -19,7 +19,9 @@ #define BLOCK_READER_H_ #include "libhdfspp/status.h" +#include "common/async_stream.h" #include "datatransfer.pb.h" +#include "connection/datanodeconnection.h" #include <memory> @@ -55,38 +57,73 @@ struct BlockReaderOptions { : verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {} }; -template <class Stream> -class RemoteBlockReader - : public std::enable_shared_from_this<RemoteBlockReader<Stream>> { +/** + * Handles the operational state of request and reading a block (or portion of + * a block) from a DataNode. + * + * Threading model: not thread-safe. + * Lifecycle: should be created, used for a single read, then freed. + */ +class BlockReader { public: - explicit RemoteBlockReader(const BlockReaderOptions &options, Stream *stream) - : stream_(stream), state_(kOpen), options_(options), + virtual void AsyncReadBlock( + const std::string & client_name, + const hadoop::hdfs::LocatedBlockProto &block, size_t offset, + const MutableBuffers &buffers, + const std::function<void(const Status &, size_t)> handler) = 0; + + virtual void AsyncReadPacket( + const MutableBuffers &buffers, + const std::function<void(const Status &, size_t bytes_transferred)> &handler) = 0; + + virtual void AsyncRequestBlock( + const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, + uint64_t offset, + const std::function<void(Status)> &handler) = 0; +}; + +class BlockReaderImpl + : public BlockReader, public std::enable_shared_from_this<BlockReaderImpl> { +public: + explicit BlockReaderImpl(const BlockReaderOptions &options, std::shared_ptr<DataNodeConnection> dn) + : dn_(dn), state_(kOpen), options_(options), chunk_padding_bytes_(0) {} - template <class MutableBufferSequence, class ReadHandler> - void async_read_some(const MutableBufferSequence &buffers, - const ReadHandler &handler); + virtual void AsyncReadPacket( + const MutableBuffers &buffers, + const std::function<void(const Status &, size_t bytes_transferred)> &handler) override; + + virtual void AsyncRequestBlock( + const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, + uint64_t offset, + const std::function<void(Status)> &handler) override; - template <class MutableBufferSequence> - size_t read_some(const MutableBufferSequence &buffers, Status *status); + virtual void AsyncReadBlock( + const std::string & client_name, + const hadoop::hdfs::LocatedBlockProto &block, size_t offset, + const MutableBuffers &buffers, + const std::function<void(const Status &, size_t)> handler) override; - Status connect(const std::string &client_name, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset); + size_t ReadPacket(const MutableBuffers &buffers, Status *status); - template <class ConnectHandler> - void async_connect(const std::string &client_name, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, - uint64_t length, uint64_t offset, - const ConnectHandler &handler); + Status RequestBlock( + const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, + uint64_t offset); private: + struct RequestBlockContinuation; + struct ReadBlockContinuation; + struct ReadPacketHeader; struct ReadChecksum; struct ReadPadding; - template <class MutableBufferSequence> struct ReadData; + struct ReadData; struct AckRead; enum State { kOpen, @@ -97,7 +134,7 @@ private: kFinished, }; - Stream *stream_; + std::shared_ptr<DataNodeConnection> dn_; hadoop::hdfs::PacketHeaderProto header_; State state_; BlockReaderOptions options_; @@ -109,6 +146,4 @@ private: }; } -#include "remote_block_reader_impl.h" - #endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h index 511c2eb..8be9ef8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h @@ -19,6 +19,10 @@ #define LIB_READER_DATA_TRANSFER_H_ #include "common/sasl_authenticator.h" +#include "common/async_stream.h" +#include "connection/datanodeconnection.h" +#include <memory> + namespace hdfs { @@ -32,26 +36,32 @@ enum Operation { kReadBlock = 81, }; -template <class Stream> class DataTransferSaslStream { +template <class Stream> class DataTransferSaslStream : public DataNodeConnection { public: - DataTransferSaslStream(Stream *stream, const std::string &username, + DataTransferSaslStream(std::shared_ptr<Stream> stream, const std::string &username, const std::string &password) : stream_(stream), authenticator_(username, password) {} template <class Handler> void Handshake(const Handler &next); - template <class MutableBufferSequence, class ReadHandler> - void async_read_some(const MutableBufferSequence &buffers, - ReadHandler &&handler); + void async_read_some(const MutableBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { + stream_->async_read_some(buf, handler); + } - template <class ConstBufferSequence, class WriteHandler> - void async_write_some(const ConstBufferSequence &buffers, - WriteHandler &&handler); + void async_write_some(const ConstBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { + stream_->async_write_some(buf, handler); + } + void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override + {(void)handler; /*TODO: Handshaking goes here*/}; private: DataTransferSaslStream(const DataTransferSaslStream &) = delete; DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete; - Stream *stream_; + std::shared_ptr<Stream> stream_; DigestMD5Authenticator authenticator_; struct ReadSaslMessage; struct Authenticator; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h index 088b86e..3ca16e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h @@ -70,7 +70,7 @@ private: template <class Stream> struct DataTransferSaslStream<Stream>::ReadSaslMessage : continuation::Continuation { - ReadSaslMessage(Stream *stream, std::string *data) + ReadSaslMessage(std::shared_ptr<Stream> stream, std::string *data) : stream_(stream), data_(data), read_pb_(stream, &resp_) {} virtual void Run(const Next &next) override { @@ -87,7 +87,7 @@ struct DataTransferSaslStream<Stream>::ReadSaslMessage } private: - Stream *stream_; + std::shared_ptr<Stream> stream_; std::string *data_; hadoop::hdfs::DataTransferEncryptorMessageProto resp_; continuation::ReadDelimitedPBMessageContinuation<Stream, 1024> read_pb_; @@ -97,7 +97,7 @@ template <class Stream> template <class Handler> void DataTransferSaslStream<Stream>::Handshake(const Handler &next) { using ::hadoop::hdfs::DataTransferEncryptorMessageProto; - using ::hdfs::continuation::Write; + using ::hdfs::asio_continuation::Write; using ::hdfs::continuation::WriteDelimitedPBMessage; static const int kMagicNumber = htonl(kDataTransferSasl); @@ -109,7 +109,7 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) { std::string resp0; DataTransferEncryptorMessageProto req1; std::string resp1; - Stream *stream; + std::shared_ptr<Stream> stream; }; auto m = continuation::Pipeline<State>::Create(); State *s = &m->state(); @@ -117,7 +117,7 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) { DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0); - m->Push(Write(stream_, kMagicNumberBuffer)) + m->Push(Write(stream_.get(), kMagicNumberBuffer)) .Push(WriteDelimitedPBMessage(stream_, &s->req0)) .Push(new ReadSaslMessage(stream_, &s->resp0)) .Push(new Authenticator(&authenticator_, &s->resp0, &s->req1)) @@ -126,19 +126,6 @@ void DataTransferSaslStream<Stream>::Handshake(const Handler &next) { m->Run([next](const Status &status, const State &) { next(status); }); } -template <class Stream> -template <class MutableBufferSequence, class ReadHandler> -void DataTransferSaslStream<Stream>::async_read_some( - const MutableBufferSequence &buffers, ReadHandler &&handler) { - stream_->async_read_some(buffers, handler); -} - -template <class Stream> -template <typename ConstBufferSequence, typename WriteHandler> -void DataTransferSaslStream<Stream>::async_write_some( - const ConstBufferSequence &buffers, WriteHandler &&handler) { - stream_->async_write_some(buffers, handler); -} } #endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h new file mode 100644 index 0000000..ad10165 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/fileinfo.h @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_READER_FILEINFO_H_ +#define LIB_READER_FILEINFO_H_ + +#include "ClientNamenodeProtocol.pb.h" + +namespace hdfs { + +/** + * Information that is assumed to be unchanging about a file for the duration of + * the operations. + */ +struct FileInfo { + unsigned long long file_length_; + std::vector<::hadoop::hdfs::LocatedBlockProto> blocks_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc deleted file mode 100644 index 68bc4ee..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "block_reader.h" - -namespace hdfs { - -hadoop::hdfs::OpReadBlockProto -ReadBlockProto(const std::string &client_name, bool verify_checksum, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset) { - using namespace hadoop::hdfs; - using namespace hadoop::common; - BaseHeaderProto *base_h = new BaseHeaderProto(); - base_h->set_allocated_block(new ExtendedBlockProto(*block)); - if (token) { - base_h->set_allocated_token(new TokenProto(*token)); - } - ClientOperationHeaderProto *h = new ClientOperationHeaderProto(); - h->set_clientname(client_name); - h->set_allocated_baseheader(base_h); - - OpReadBlockProto p; - p.set_allocated_header(h); - p.set_offset(offset); - p.set_len(length); - p.set_sendchecksums(verify_checksum); - // TODO: p.set_allocated_cachingstrategy(); - return p; -} -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h deleted file mode 100644 index 35c2ce4..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h +++ /dev/null @@ -1,342 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_ -#define LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_ - -#include "datatransfer.h" -#include "common/continuation/asio.h" -#include "common/continuation/protobuf.h" - -#include <asio/buffers_iterator.hpp> -#include <asio/streambuf.hpp> -#include <asio/write.hpp> - -#include <arpa/inet.h> - -#include <future> - -namespace hdfs { - -hadoop::hdfs::OpReadBlockProto -ReadBlockProto(const std::string &client_name, bool verify_checksum, - const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset); - -template <class Stream> -template <class ConnectHandler> -void RemoteBlockReader<Stream>::async_connect( - const std::string &client_name, const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset, const ConnectHandler &handler) { - // The total number of bytes that we need to transfer from the DN is - // the amount that the user wants (bytesToRead), plus the padding at - // the beginning in order to chunk-align. Note that the DN may elect - // to send more than this amount if the read starts/ends mid-chunk. - bytes_to_read_ = length; - - struct State { - std::string header; - hadoop::hdfs::OpReadBlockProto request; - hadoop::hdfs::BlockOpResponseProto response; - }; - - auto m = continuation::Pipeline<State>::Create(); - State *s = &m->state(); - - s->header.insert(s->header.begin(), - {0, kDataTransferVersion, Operation::kReadBlock}); - s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum, - token, block, length, offset)); - - auto read_pb_message = - new continuation::ReadDelimitedPBMessageContinuation<Stream, 16384>( - stream_, &s->response); - - m->Push(continuation::Write(stream_, asio::buffer(s->header))) - .Push(continuation::WriteDelimitedPBMessage(stream_, &s->request)) - .Push(read_pb_message); - - m->Run([this, handler, offset](const Status &status, const State &s) { - Status stat = status; - if (stat.ok()) { - const auto &resp = s.response; - if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) { - if (resp.has_readopchecksuminfo()) { - const auto &checksum_info = resp.readopchecksuminfo(); - chunk_padding_bytes_ = offset - checksum_info.chunkoffset(); - } - state_ = kReadPacketHeader; - } else { - stat = Status::Error(s.response.message().c_str()); - } - } - handler(stat); - }); -} - -template <class Stream> -struct RemoteBlockReader<Stream>::ReadPacketHeader - : continuation::Continuation { - ReadPacketHeader(RemoteBlockReader<Stream> *parent) : parent_(parent) {} - - virtual void Run(const Next &next) override { - parent_->packet_data_read_bytes_ = 0; - parent_->packet_len_ = 0; - auto handler = [next, this](const asio::error_code &ec, size_t) { - Status status; - if (ec) { - status = Status(ec.value(), ec.message().c_str()); - } else { - parent_->packet_len_ = packet_length(); - parent_->header_.Clear(); - bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart], - header_length()); - assert(v && "Failed to parse the header"); - parent_->state_ = kReadChecksum; - } - next(status); - }; - - asio::async_read(*parent_->stream_, asio::buffer(buf_), - std::bind(&ReadPacketHeader::CompletionHandler, this, - std::placeholders::_1, std::placeholders::_2), - handler); - } - -private: - static const size_t kMaxHeaderSize = 512; - static const size_t kPayloadLenOffset = 0; - static const size_t kPayloadLenSize = sizeof(int32_t); - static const size_t kHeaderLenOffset = 4; - static const size_t kHeaderLenSize = sizeof(int16_t); - static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize; - - RemoteBlockReader<Stream> *parent_; - std::array<char, kMaxHeaderSize> buf_; - - size_t packet_length() const { - return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset])); - } - - size_t header_length() const { - return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset])); - } - - size_t CompletionHandler(const asio::error_code &ec, size_t transferred) { - if (ec) { - return 0; - } else if (transferred < kHeaderStart) { - return kHeaderStart - transferred; - } else { - return kHeaderStart + header_length() - transferred; - } - } -}; - -template <class Stream> -struct RemoteBlockReader<Stream>::ReadChecksum : continuation::Continuation { - ReadChecksum(RemoteBlockReader<Stream> *parent) : parent_(parent) {} - - virtual void Run(const Next &next) override { - auto parent = parent_; - if (parent->state_ != kReadChecksum) { - next(Status::OK()); - return; - } - - auto handler = [parent, next](const asio::error_code &ec, size_t) { - Status status; - if (ec) { - status = Status(ec.value(), ec.message().c_str()); - } else { - parent->state_ = - parent->chunk_padding_bytes_ ? kReadPadding : kReadData; - } - next(status); - }; - parent->checksum_.resize(parent->packet_len_ - sizeof(int) - - parent->header_.datalen()); - asio::async_read(*parent->stream_, asio::buffer(parent->checksum_), - handler); - } - -private: - RemoteBlockReader<Stream> *parent_; -}; - -template <class Stream> -struct RemoteBlockReader<Stream>::ReadPadding : continuation::Continuation { - ReadPadding(RemoteBlockReader<Stream> *parent) - : parent_(parent), padding_(parent->chunk_padding_bytes_), - bytes_transferred_(std::make_shared<size_t>(0)), - read_data_(new ReadData<asio::mutable_buffers_1>( - parent, bytes_transferred_, asio::buffer(padding_))) {} - - virtual void Run(const Next &next) override { - if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) { - next(Status::OK()); - return; - } - - auto h = [next, this](const Status &status) { - if (status.ok()) { - assert(reinterpret_cast<const int &>(*bytes_transferred_) == - parent_->chunk_padding_bytes_); - parent_->chunk_padding_bytes_ = 0; - parent_->state_ = kReadData; - } - next(status); - }; - read_data_->Run(h); - } - -private: - RemoteBlockReader<Stream> *parent_; - std::vector<char> padding_; - std::shared_ptr<size_t> bytes_transferred_; - std::shared_ptr<continuation::Continuation> read_data_; - ReadPadding(const ReadPadding &) = delete; - ReadPadding &operator=(const ReadPadding &) = delete; -}; - -template <class Stream> -template <class MutableBufferSequence> -struct RemoteBlockReader<Stream>::ReadData : continuation::Continuation { - ReadData(RemoteBlockReader<Stream> *parent, - std::shared_ptr<size_t> bytes_transferred, - const MutableBufferSequence &buf) - : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) {} - - virtual void Run(const Next &next) override { - auto handler = - [next, this](const asio::error_code &ec, size_t transferred) { - Status status; - if (ec) { - status = Status(ec.value(), ec.message().c_str()); - } - *bytes_transferred_ += transferred; - parent_->bytes_to_read_ -= transferred; - parent_->packet_data_read_bytes_ += transferred; - if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) { - parent_->state_ = kReadPacketHeader; - } - next(status); - }; - - auto data_len = - parent_->header_.datalen() - parent_->packet_data_read_bytes_; - async_read(*parent_->stream_, buf_, asio::transfer_exactly(data_len), - handler); - } - -private: - RemoteBlockReader<Stream> *parent_; - std::shared_ptr<size_t> bytes_transferred_; - MutableBufferSequence buf_; -}; - -template <class Stream> -struct RemoteBlockReader<Stream>::AckRead : continuation::Continuation { - AckRead(RemoteBlockReader<Stream> *parent) : parent_(parent) {} - - virtual void Run(const Next &next) override { - if (parent_->bytes_to_read_ > 0) { - next(Status::OK()); - return; - } - - auto m = - continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(); - m->state().set_status(parent_->options_.verify_checksum - ? hadoop::hdfs::Status::CHECKSUM_OK - : hadoop::hdfs::Status::SUCCESS); - - m->Push( - continuation::WriteDelimitedPBMessage(parent_->stream_, &m->state())); - - m->Run([this, next](const Status &status, - const hadoop::hdfs::ClientReadStatusProto &) { - if (status.ok()) { - parent_->state_ = RemoteBlockReader<Stream>::kFinished; - } - next(status); - }); - } - -private: - RemoteBlockReader<Stream> *parent_; -}; - -template <class Stream> -template <class MutableBufferSequence, class ReadHandler> -void RemoteBlockReader<Stream>::async_read_some( - const MutableBufferSequence &buffers, const ReadHandler &handler) { - assert(state_ != kOpen && "Not connected"); - - struct State { - std::shared_ptr<size_t> bytes_transferred; - }; - auto m = continuation::Pipeline<State>::Create(); - m->state().bytes_transferred = std::make_shared<size_t>(0); - - m->Push(new ReadPacketHeader(this)) - .Push(new ReadChecksum(this)) - .Push(new ReadPadding(this)) - .Push(new ReadData<MutableBufferSequence>( - this, m->state().bytes_transferred, buffers)) - .Push(new AckRead(this)); - - auto self = this->shared_from_this(); - m->Run([self, handler](const Status &status, const State &state) { - handler(status, *state.bytes_transferred); - }); -} - -template <class Stream> -template <class MutableBufferSequence> -size_t -RemoteBlockReader<Stream>::read_some(const MutableBufferSequence &buffers, - Status *status) { - size_t transferred = 0; - auto done = std::make_shared<std::promise<void>>(); - auto future = done->get_future(); - async_read_some(buffers, - [status, &transferred, done](const Status &stat, size_t t) { - *status = stat; - transferred = t; - done->set_value(); - }); - future.wait(); - return transferred; -} - -template <class Stream> -Status RemoteBlockReader<Stream>::connect( - const std::string &client_name, const hadoop::common::TokenProto *token, - const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, - uint64_t offset) { - auto stat = std::make_shared<std::promise<Status>>(); - std::future<Status> future(stat->get_future()); - async_connect(client_name, token, block, length, offset, - [stat](const Status &status) { stat->set_value(status); }); - return future.get(); -} -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc index 83721a7..29d455f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -19,9 +19,6 @@ #include "rpc_connection.h" #include "common/util.h" -#include <openssl/rand.h> - -#include <sstream> #include <future> namespace hdfs { @@ -83,15 +80,4 @@ Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req, return future.get(); } -std::string RpcEngine::GetRandomClientName() { - unsigned char buf[6] = { - 0, - }; - RAND_pseudo_bytes(buf, sizeof(buf)); - - std::stringstream ss; - ss << "libhdfs++_" - << Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf))); - return ss.str(); -} } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt index 7d06141..cc5ab01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -32,7 +32,7 @@ include_directories( ${LIBHDFS_SRC_DIR} ${OS_DIR} ) -add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs_cpp.cc) +add_library(hdfspp_test_shim_static STATIC hdfs_shim.c libhdfs_wrapper.c libhdfspp_wrapper.cc ${LIBHDFSPP_BINDING_C}/hdfs.cc) add_library(test_common OBJECT mock_connection.cc) @@ -44,24 +44,20 @@ protobuf_generate_cpp(PROTO_TEST_SRCS PROTO_TEST_HDRS ) add_executable(remote_block_reader_test remote_block_reader_test.cc $<TARGET_OBJECTS:test_common>) -target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) +target_link_libraries(remote_block_reader_test reader proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_test(remote_block_reader remote_block_reader_test) add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc) target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_test(sasl_digest_md5 sasl_digest_md5_test) -add_executable(inputstream_test inputstream_test.cc) -target_link_libraries(inputstream_test fs rpc reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) -add_test(inputstream inputstream_test) - include_directories(${CMAKE_CURRENT_BINARY_DIR}) add_executable(rpc_engine_test rpc_engine_test.cc ${PROTO_TEST_SRCS} ${PROTO_TEST_HDRS} $<TARGET_OBJECTS:test_common>) target_link_libraries(rpc_engine_test rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_test(rpc_engine rpc_engine_test) add_executable(bad_datanode_test bad_datanode_test.cc) -target_link_libraries(bad_datanode_test rpc reader proto fs bindings_c rpc proto common reader ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) +target_link_libraries(bad_datanode_test rpc reader proto fs bindings_c rpc proto common reader connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) add_test(bad_datanode bad_datanode_test) add_executable(node_exclusion_test node_exclusion_test.cc) @@ -73,5 +69,5 @@ target_link_libraries(configuration_test common gmock_main ${CMAKE_THREAD_LIBS_I add_test(configuration configuration_test) build_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static expect.c test_libhdfs_threaded.c ${OS_DIR}/thread.c) -link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY}) +link_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static fs reader rpc proto common connection ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} native_mini_dfs ${JAVA_JVM_LIBRARY}) add_libhdfs_test(libhdfs_threaded hdfspp_test_shim_static) http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc index cf1fdbb..0f69195 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/bad_datanode_test.cc @@ -19,6 +19,8 @@ #include "fs/filesystem.h" #include "fs/bad_datanode_tracker.h" +#include "common/util.h" + #include <gmock/gmock.h> using hadoop::common::TokenProto; @@ -34,70 +36,140 @@ using ::testing::Return; using namespace hdfs; -class MockReader { - public: - virtual ~MockReader() {} +class MockReader : public BlockReader { +public: MOCK_METHOD2( - async_read_some, + AsyncReadPacket, void(const asio::mutable_buffers_1 &, const std::function<void(const Status &, size_t transferred)> &)); - MOCK_METHOD6(async_connect, - void(const std::string &, TokenProto *, ExtendedBlockProto *, - uint64_t, uint64_t, - const std::function<void(const Status &)> &)); + MOCK_METHOD5(AsyncRequestBlock, + void(const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, uint64_t offset, + const std::function<void(Status)> &handler)); + + MOCK_METHOD5(AsyncReadBlock, void( + const std::string & client_name, + const hadoop::hdfs::LocatedBlockProto &block, + size_t offset, + const MutableBuffers &buffers, + const std::function<void(const Status &, size_t)> handler)); }; -template <class Trait> -struct MockBlockReaderTrait { - typedef MockReader Reader; - struct State { - MockReader reader_; - size_t transferred_; - Reader *reader() { return &reader_; } - size_t *transferred() { return &transferred_; } - const size_t *transferred() const { return &transferred_; } - }; +class MockDNConnection : public DataNodeConnection, public std::enable_shared_from_this<MockDNConnection> { + void Connect(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)> handler) override { + handler(Status::OK(), shared_from_this()); + } + + void async_read_some(const MutableBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { + (void)buf; + handler(asio::error::fault, 0); + } - static continuation::Pipeline<State> *CreatePipeline( - ::asio::io_service *, const DatanodeInfoProto &) { - auto m = continuation::Pipeline<State>::Create(); - *m->state().transferred() = 0; - Trait::InitializeMockReader(m->state().reader()); - return m; + void async_write_some(const ConstBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { + (void)buf; + handler(asio::error::fault, 0); } }; + +class PartialMockFileHandle : public FileHandleImpl { + using FileHandleImpl::FileHandleImpl; +public: + std::shared_ptr<MockReader> mock_reader_ = std::make_shared<MockReader>(); +protected: + std::shared_ptr<BlockReader> CreateBlockReader(const BlockReaderOptions &options, + std::shared_ptr<DataNodeConnection> dn) override + { + (void) options; (void) dn; + assert(mock_reader_); + return mock_reader_; + } + std::shared_ptr<DataNodeConnection> CreateDataNodeConnection( + ::asio::io_service *io_service, + const ::hadoop::hdfs::DatanodeInfoProto & dn, + const hadoop::common::TokenProto * token) override { + (void) io_service; (void) dn; (void) token; + return std::make_shared<MockDNConnection>(); + } + + +}; + +TEST(BadDataNodeTest, TestNoNodes) { + auto file_info = std::make_shared<struct FileInfo>(); + file_info->blocks_.push_back(LocatedBlockProto()); + LocatedBlockProto & block = file_info->blocks_[0]; + ExtendedBlockProto *b = block.mutable_b(); + b->set_poolid(""); + b->set_blockid(1); + b->set_generationstamp(1); + b->set_numbytes(4096); + + // Set up the one block to have one datanode holding it + DatanodeInfoProto *di = block.add_locs(); + DatanodeIDProto *dnid = di->mutable_id(); + dnid->set_datanodeuuid("foo"); + + char buf[4096] = { + 0, + }; + IoServiceImpl io_service; + auto bad_node_tracker = std::make_shared<BadDataNodeTracker>(); + bad_node_tracker->AddBadNode("foo"); + + PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, bad_node_tracker); + Status stat; + size_t read = 0; + + // Exclude the one datanode with the data + is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), nullptr, + [&stat, &read](const Status &status, const std::string &, size_t transferred) { + stat = status; + read = transferred; + }); + + // Should fail with no resource available + ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again), stat.code()); + ASSERT_EQ(0UL, read); +} + TEST(BadDataNodeTest, RecoverableError) { - LocatedBlocksProto blocks; - LocatedBlockProto block; - DatanodeInfoProto dn; + auto file_info = std::make_shared<struct FileInfo>(); + file_info->blocks_.push_back(LocatedBlockProto()); + LocatedBlockProto & block = file_info->blocks_[0]; + ExtendedBlockProto *b = block.mutable_b(); + b->set_poolid(""); + b->set_blockid(1); + b->set_generationstamp(1); + b->set_numbytes(4096); + + // Set up the one block to have one datanode holding it + DatanodeInfoProto *di = block.add_locs(); + DatanodeIDProto *dnid = di->mutable_id(); + dnid->set_datanodeuuid("foo"); + char buf[4096] = { 0, }; IoServiceImpl io_service; - Options default_options; - FileSystemImpl fs(&io_service, default_options); auto tracker = std::make_shared<BadDataNodeTracker>(); - InputStreamImpl is(&fs, &blocks, tracker); + PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, tracker); Status stat; size_t read = 0; - struct Trait { - static void InitializeMockReader(MockReader *reader) { - EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) - .WillOnce(InvokeArgument<5>(Status::OK())); - - EXPECT_CALL(*reader, async_read_some(_, _)) - // resource unavailable error - .WillOnce(InvokeArgument<1>( - Status::ResourceUnavailable( - "Unable to get some resource, try again later"), - sizeof(buf))); - } - }; + EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) + // resource unavailable error + .WillOnce(InvokeArgument<4>( + Status::ResourceUnavailable("Unable to get some resource, try again later"), 0)); - is.AsyncReadBlock<MockBlockReaderTrait<Trait>>( - "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), + + is.AsyncPreadSome( + 0, asio::buffer(buf, sizeof(buf)), nullptr, [&stat, &read](const Status &status, const std::string &, size_t transferred) { stat = status; @@ -108,7 +180,7 @@ TEST(BadDataNodeTest, RecoverableError) { std::string failing_dn = "id_of_bad_datanode"; if (!stat.ok()) { - if (InputStream::ShouldExclude(stat)) { + if (FileHandle::ShouldExclude(stat)) { tracker->AddBadNode(failing_dn); } } @@ -117,35 +189,37 @@ TEST(BadDataNodeTest, RecoverableError) { } TEST(BadDataNodeTest, InternalError) { - LocatedBlocksProto blocks; - LocatedBlockProto block; - DatanodeInfoProto dn; + auto file_info = std::make_shared<struct FileInfo>(); + file_info->blocks_.push_back(LocatedBlockProto()); + LocatedBlockProto & block = file_info->blocks_[0]; + ExtendedBlockProto *b = block.mutable_b(); + b->set_poolid(""); + b->set_blockid(1); + b->set_generationstamp(1); + b->set_numbytes(4096); + + // Set up the one block to have one datanode holding it + DatanodeInfoProto *di = block.add_locs(); + DatanodeIDProto *dnid = di->mutable_id(); + dnid->set_datanodeuuid("foo"); + char buf[4096] = { 0, }; IoServiceImpl io_service; - Options default_options; auto tracker = std::make_shared<BadDataNodeTracker>(); - FileSystemImpl fs(&io_service, default_options); - InputStreamImpl is(&fs, &blocks, tracker); + PartialMockFileHandle is(&io_service.io_service(), GetRandomClientName(), file_info, tracker); Status stat; size_t read = 0; - struct Trait { - static void InitializeMockReader(MockReader *reader) { - EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) - .WillOnce(InvokeArgument<5>(Status::OK())); - - EXPECT_CALL(*reader, async_read_some(_, _)) - // something bad happened on the DN, calling again isn't going to help - .WillOnce( - InvokeArgument<1>(Status::Exception("server_explosion_exception", - "the server exploded"), + EXPECT_CALL(*is.mock_reader_, AsyncReadBlock(_,_,_,_,_)) + // resource unavailable error + .WillOnce(InvokeArgument<4>( + Status::Exception("server_explosion_exception", + "the server exploded"), sizeof(buf))); - } - }; - is.AsyncReadBlock<MockBlockReaderTrait<Trait>>( - "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), + is.AsyncPreadSome( + 0, asio::buffer(buf, sizeof(buf)), nullptr, [&stat, &read](const Status &status, const std::string &, size_t transferred) { stat = status; @@ -156,7 +230,7 @@ TEST(BadDataNodeTest, InternalError) { std::string failing_dn = "id_of_bad_datanode"; if (!stat.ok()) { - if (InputStream::ShouldExclude(stat)) { + if (FileHandle::ShouldExclude(stat)) { tracker->AddBadNode(failing_dn); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc deleted file mode 100644 index 786b846..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "fs/filesystem.h" -#include "fs/bad_datanode_tracker.h" -#include <gmock/gmock.h> - -using hadoop::common::TokenProto; -using hadoop::hdfs::DatanodeInfoProto; -using hadoop::hdfs::DatanodeIDProto; -using hadoop::hdfs::ExtendedBlockProto; -using hadoop::hdfs::LocatedBlockProto; -using hadoop::hdfs::LocatedBlocksProto; - -using ::testing::_; -using ::testing::InvokeArgument; -using ::testing::Return; - -using namespace hdfs; - -namespace hdfs { - -class MockReader { - public: - virtual ~MockReader() {} - MOCK_METHOD2( - async_read_some, - void(const asio::mutable_buffers_1 &, - const std::function<void(const Status &, size_t transferred)> &)); - - MOCK_METHOD6(async_connect, - void(const std::string &, TokenProto *, ExtendedBlockProto *, - uint64_t, uint64_t, - const std::function<void(const Status &)> &)); -}; - -template <class Trait> -struct MockBlockReaderTrait { - typedef MockReader Reader; - struct State { - MockReader reader_; - size_t transferred_; - Reader *reader() { return &reader_; } - size_t *transferred() { return &transferred_; } - const size_t *transferred() const { return &transferred_; } - }; - - static continuation::Pipeline<State> *CreatePipeline( - ::asio::io_service *, const DatanodeInfoProto &) { - auto m = continuation::Pipeline<State>::Create(); - *m->state().transferred() = 0; - Trait::InitializeMockReader(m->state().reader()); - return m; - } -}; -} - -TEST(InputStreamTest, TestReadSingleTrunk) { - LocatedBlocksProto blocks; - LocatedBlockProto block; - DatanodeInfoProto dn; - char buf[4096] = { - 0, - }; - IoServiceImpl io_service; - Options options; - FileSystemImpl fs(&io_service, options); - InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>()); - Status stat; - size_t read = 0; - struct Trait { - static void InitializeMockReader(MockReader *reader) { - EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) - .WillOnce(InvokeArgument<5>(Status::OK())); - - EXPECT_CALL(*reader, async_read_some(_, _)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); - } - }; - - is.AsyncReadBlock<MockBlockReaderTrait<Trait>>( - "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), - [&stat, &read](const Status &status, const std::string &, - size_t transferred) { - stat = status; - read = transferred; - }); - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(sizeof(buf), read); - read = 0; -} - -TEST(InputStreamTest, TestReadMultipleTrunk) { - LocatedBlocksProto blocks; - LocatedBlockProto block; - DatanodeInfoProto dn; - char buf[4096] = { - 0, - }; - IoServiceImpl io_service; - Options options; - FileSystemImpl fs(&io_service, options); - InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>()); - Status stat; - size_t read = 0; - struct Trait { - static void InitializeMockReader(MockReader *reader) { - EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) - .WillOnce(InvokeArgument<5>(Status::OK())); - - EXPECT_CALL(*reader, async_read_some(_, _)) - .Times(4) - .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)); - } - }; - - is.AsyncReadBlock<MockBlockReaderTrait<Trait>>( - "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), - [&stat, &read](const Status &status, const std::string &, - size_t transferred) { - stat = status; - read = transferred; - }); - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(sizeof(buf), read); - read = 0; -} - -TEST(InputStreamTest, TestReadError) { - LocatedBlocksProto blocks; - LocatedBlockProto block; - DatanodeInfoProto dn; - char buf[4096] = { - 0, - }; - IoServiceImpl io_service; - Options options; - FileSystemImpl fs(&io_service, options); - InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>()); - Status stat; - size_t read = 0; - struct Trait { - static void InitializeMockReader(MockReader *reader) { - EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) - .WillOnce(InvokeArgument<5>(Status::OK())); - - EXPECT_CALL(*reader, async_read_some(_, _)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) - .WillOnce(InvokeArgument<1>(Status::Error("error"), 0)); - } - }; - - is.AsyncReadBlock<MockBlockReaderTrait<Trait>>( - "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), - [&stat, &read](const Status &status, const std::string &, - size_t transferred) { - stat = status; - read = transferred; - }); - ASSERT_FALSE(stat.ok()); - ASSERT_EQ(sizeof(buf) / 4 * 3, read); - read = 0; -} - -TEST(InputStreamTest, TestExcludeDataNode) { - LocatedBlocksProto blocks; - LocatedBlockProto *block = blocks.add_blocks(); - ExtendedBlockProto *b = block->mutable_b(); - b->set_poolid(""); - b->set_blockid(1); - b->set_generationstamp(1); - b->set_numbytes(4096); - - DatanodeInfoProto *di = block->add_locs(); - DatanodeIDProto *dnid = di->mutable_id(); - dnid->set_datanodeuuid("foo"); - - char buf[4096] = { - 0, - }; - IoServiceImpl io_service; - Options options; - FileSystemImpl fs(&io_service, options); - InputStreamImpl is(&fs, &blocks, std::make_shared<BadDataNodeTracker>()); - Status stat; - size_t read = 0; - struct Trait { - static void InitializeMockReader(MockReader *reader) { - EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) - .WillOnce(InvokeArgument<5>(Status::OK())); - - EXPECT_CALL(*reader, async_read_some(_, _)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); - } - }; - - std::shared_ptr<NodeExclusionRule> exclude_set = - std::make_shared<ExclusionSet>(std::set<std::string>({"foo"})); - is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), exclude_set, - [&stat, &read](const Status &status, const std::string &, - size_t transferred) { - stat = status; - read = transferred; - }); - ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again), - stat.code()); - ASSERT_EQ(0UL, read); -} - -int main(int argc, char *argv[]) { - // The following line must be executed to initialize Google Mock - // (and Google Test) before running the tests. - ::testing::InitGoogleMock(&argc, argv); - return RUN_ALL_TESTS(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h index 8c0ef8c..4c15375 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h @@ -18,6 +18,8 @@ #ifndef LIBHDFSPP_TEST_MOCK_CONNECTION_H_ #define LIBHDFSPP_TEST_MOCK_CONNECTION_H_ +#include "common/async_stream.h" + #include <asio/error_code.hpp> #include <asio/buffer.hpp> #include <asio/streambuf.hpp> @@ -27,13 +29,15 @@ namespace hdfs { -class MockConnectionBase { +class MockConnectionBase : public AsyncStream{ public: MockConnectionBase(::asio::io_service *io_service); virtual ~MockConnectionBase(); typedef std::pair<asio::error_code, std::string> ProducerResult; - template <class MutableBufferSequence, class Handler> - void async_read_some(const MutableBufferSequence &buf, Handler &&handler) { + + void async_read_some(const MutableBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { if (produced_.size() == 0) { ProducerResult r = Produce(); if (r.first) { @@ -51,8 +55,9 @@ public: io_service_->post(std::bind(handler, asio::error_code(), len)); } - template <class ConstBufferSequence, class Handler> - void async_write_some(const ConstBufferSequence &buf, Handler &&handler) { + void async_write_some(const ConstBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { // CompletionResult res = OnWrite(buf); io_service_->post(std::bind(handler, asio::error_code(), asio::buffer_size(buf))); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a06bc8e1/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc index f54b14f..6f3122e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc @@ -21,6 +21,8 @@ #include "datatransfer.pb.h" #include "common/util.h" #include "reader/block_reader.h" +#include "reader/datatransfer.h" +#include "reader/fileinfo.h" #include <google/protobuf/io/coded_stream.h> #include <google/protobuf/io/zero_copy_stream_impl.h> @@ -36,10 +38,14 @@ using ::hadoop::hdfs::DataTransferEncryptorMessageProto; using ::hadoop::hdfs::ExtendedBlockProto; using ::hadoop::hdfs::PacketHeaderProto; using ::hadoop::hdfs::ReadOpChecksumInfoProto; +using ::hadoop::hdfs::LocatedBlockProto; +using ::hadoop::hdfs::LocatedBlocksProto; using ::asio::buffer; using ::asio::error_code; using ::asio::mutable_buffers_1; +using ::testing::_; +using ::testing::InvokeArgument; using ::testing::Return; using std::make_pair; using std::string; @@ -49,12 +55,47 @@ namespace pbio = pb::io; namespace hdfs { -class MockDNConnection : public MockConnectionBase { - public: +class MockDNConnection : public MockConnectionBase, public DataNodeConnection{ +public: MockDNConnection(::asio::io_service &io_service) : MockConnectionBase(&io_service) {} MOCK_METHOD0(Produce, ProducerResult()); + + MOCK_METHOD1(Connect, void(std::function<void(Status status, std::shared_ptr<DataNodeConnection> dn)>)); + + void async_read_some(const MutableBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { + this->MockConnectionBase::async_read_some(buf, handler); + } + + void async_write_some(const ConstBuffers &buf, + std::function<void (const asio::error_code & error, + std::size_t bytes_transferred) > handler) override { + this->MockConnectionBase::async_write_some(buf, handler); + } +}; + +// Mocks AsyncReadPacket and AsyncRequestBlock but not AsyncReadBlock, so we +// can test the logic of AsyncReadBlock +class PartialMockReader : public BlockReaderImpl { +public: + PartialMockReader() : + BlockReaderImpl(BlockReaderOptions(), std::shared_ptr<DataNodeConnection>()) {}; + + MOCK_METHOD2( + AsyncReadPacket, + void(const asio::mutable_buffers_1 &, + const std::function<void(const Status &, size_t transferred)> &)); + + MOCK_METHOD5(AsyncRequestBlock, + void(const std::string &client_name, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, uint64_t offset, + const std::function<void(Status)> &handler)); }; + + } static inline string ToDelimitedString(const pb::MessageLite *msg) { @@ -94,20 +135,102 @@ static inline std::pair<error_code, string> ProducePacket( return std::make_pair(error_code(), std::move(payload)); } +TEST(RemoteBlockReaderTest, TestReadSingleTrunk) { + auto file_info = std::make_shared<struct FileInfo>(); + LocatedBlocksProto blocks; + LocatedBlockProto block; + char buf[4096] = { + 0, + }; + + Status stat; + size_t read = 0; + PartialMockReader reader; + EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _)) + .WillOnce(InvokeArgument<4>(Status::OK())); + EXPECT_CALL(reader, AsyncReadPacket(_, _)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); + + reader.AsyncReadBlock( + GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(sizeof(buf), read); + read = 0; +} + +TEST(RemoteBlockReaderTest, TestReadMultipleTrunk) { + LocatedBlockProto block; + char buf[4096] = { + 0, + }; + Status stat; + size_t read = 0; + + PartialMockReader reader; + EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _)) + .WillOnce(InvokeArgument<4>(Status::OK())); + + EXPECT_CALL(reader, AsyncReadPacket(_, _)) + .Times(4) + .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)); + + reader.AsyncReadBlock( + GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(sizeof(buf), read); + read = 0; +} + +TEST(RemoteBlockReaderTest, TestReadError) { + LocatedBlockProto block; + char buf[4096] = { + 0, + }; + Status stat; + size_t read = 0; + PartialMockReader reader; + EXPECT_CALL(reader, AsyncRequestBlock(_, _, _, _, _)) + .WillOnce(InvokeArgument<4>(Status::OK())); + + EXPECT_CALL(reader, AsyncReadPacket(_, _)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::Error("error"), 0)); + + reader.AsyncReadBlock( + GetRandomClientName(), block, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_FALSE(stat.ok()); + ASSERT_EQ(sizeof(buf) / 4 * 3, read); + read = 0; +} + template <class Stream = MockDNConnection, class Handler> -static std::shared_ptr<RemoteBlockReader<Stream>> ReadContent( - Stream *conn, TokenProto *token, const ExtendedBlockProto &block, - uint64_t length, uint64_t offset, const mutable_buffers_1 &buf, - const Handler &handler) { +static std::shared_ptr<BlockReaderImpl> +ReadContent(std::shared_ptr<Stream> conn, const ExtendedBlockProto &block, + uint64_t length, uint64_t offset, const mutable_buffers_1 &buf, + const Handler &handler) { BlockReaderOptions options; - auto reader = std::make_shared<RemoteBlockReader<Stream>>(options, conn); + auto reader = std::make_shared<BlockReaderImpl>(options, conn); Status result; - reader->async_connect("libhdfs++", token, &block, length, offset, + reader->AsyncRequestBlock("libhdfs++", &block, length, offset, [buf, reader, handler](const Status &stat) { if (!stat.ok()) { handler(stat, 0); } else { - reader->async_read_some(buf, handler); + reader->AsyncReadPacket(buf, handler); } }); return reader; @@ -117,11 +240,11 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) { static const size_t kChunkSize = 512; static const string kChunkData(kChunkSize, 'a'); ::asio::io_service io_service; - MockDNConnection conn(io_service); + auto conn = std::make_shared<MockDNConnection>(io_service); BlockOpResponseProto block_op_resp; block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); - EXPECT_CALL(conn, Produce()) + EXPECT_CALL(*conn, Produce()) .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); @@ -130,16 +253,19 @@ TEST(RemoteBlockReaderTest, TestReadWholeBlock) { block.set_blockid(0); block.set_generationstamp(0); + bool done = false; std::string data(kChunkSize, 0); - ReadContent(&conn, nullptr, block, kChunkSize, 0, + ReadContent(conn, block, kChunkSize, 0, buffer(const_cast<char *>(data.c_str()), data.size()), - [&data, &io_service](const Status &stat, size_t transferred) { + [&data, &io_service, &done](const Status &stat, size_t transferred) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(kChunkSize, transferred); ASSERT_EQ(kChunkData, data); + done = true; io_service.stop(); }); io_service.run(); + ASSERT_TRUE(done); } TEST(RemoteBlockReaderTest, TestReadWithinChunk) { @@ -149,7 +275,7 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) { static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b'); ::asio::io_service io_service; - MockDNConnection conn(io_service); + auto conn = std::make_shared<MockDNConnection>(io_service); BlockOpResponseProto block_op_resp; ReadOpChecksumInfoProto *checksum_info = block_op_resp.mutable_readopchecksuminfo(); @@ -159,7 +285,7 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) { checksum->set_bytesperchecksum(512); block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); - EXPECT_CALL(conn, Produce()) + EXPECT_CALL(*conn, Produce()) .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) .WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true))); @@ -168,16 +294,20 @@ TEST(RemoteBlockReaderTest, TestReadWithinChunk) { block.set_blockid(0); block.set_generationstamp(0); + bool done = false; + string data(kLength, 0); - ReadContent(&conn, nullptr, block, data.size(), kOffset, + ReadContent(conn, block, data.size(), kOffset, buffer(const_cast<char *>(data.c_str()), data.size()), - [&data, &io_service](const Status &stat, size_t transferred) { + [&data, &io_service,&done](const Status &stat, size_t transferred) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(kLength, transferred); ASSERT_EQ(kChunkData.substr(kOffset, kLength), data); + done = true; io_service.stop(); }); io_service.run(); + ASSERT_TRUE(done); } TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { @@ -185,11 +315,11 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { static const string kChunkData(kChunkSize, 'a'); ::asio::io_service io_service; - MockDNConnection conn(io_service); + auto conn = std::make_shared<MockDNConnection>(io_service); BlockOpResponseProto block_op_resp; block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); - EXPECT_CALL(conn, Produce()) + EXPECT_CALL(*conn, Produce()) .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false))) .WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true))); @@ -202,25 +332,22 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { string data(kChunkSize, 0); mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size()); BlockReaderOptions options; - auto reader = - std::make_shared<RemoteBlockReader<MockDNConnection>>(options, &conn); + auto reader = std::make_shared<BlockReaderImpl>(options, conn); Status result; - reader->async_connect( - "libhdfs++", nullptr, &block, data.size(), 0, + reader->AsyncRequestBlock( + "libhdfs++", &block, data.size(), 0, [buf, reader, &data, &io_service](const Status &stat) { ASSERT_TRUE(stat.ok()); - reader->async_read_some( - buf, [buf, reader, &data, &io_service](const Status &stat, - size_t transferred) { + reader->AsyncReadPacket( + buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(kChunkSize, transferred); ASSERT_EQ(kChunkData, data); data.clear(); data.resize(kChunkSize); transferred = 0; - reader->async_read_some( - buf, - [&data, &io_service](const Status &stat, size_t transferred) { + reader->AsyncReadPacket( + buf, [&data,&io_service](const Status &stat, size_t transferred) { ASSERT_TRUE(stat.ok()); ASSERT_EQ(kChunkSize, transferred); ASSERT_EQ(kChunkData, data); @@ -234,12 +361,11 @@ TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { TEST(RemoteBlockReaderTest, TestSaslConnection) { static const size_t kChunkSize = 512; static const string kChunkData(kChunkSize, 'a'); - static const string kAuthPayload = - "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/" - "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\"," - "charset=utf-8,algorithm=md5-sess"; + static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/" + "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\"," + "charset=utf-8,algorithm=md5-sess"; ::asio::io_service io_service; - MockDNConnection conn(io_service); + auto conn = std::make_shared<MockDNConnection>(io_service); BlockOpResponseProto block_op_resp; block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); @@ -252,23 +378,23 @@ TEST(RemoteBlockReaderTest, TestSaslConnection) { ::hadoop::hdfs:: DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS); - EXPECT_CALL(conn, Produce()) + EXPECT_CALL(*conn, Produce()) .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp0)))) .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp1)))) .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); - DataTransferSaslStream<MockDNConnection> sasl_conn(&conn, "foo", "bar"); + auto sasl_conn = std::make_shared<DataTransferSaslStream<MockDNConnection> >(conn, "foo", "bar"); ExtendedBlockProto block; block.set_poolid("foo"); block.set_blockid(0); block.set_generationstamp(0); std::string data(kChunkSize, 0); - sasl_conn.Handshake([&sasl_conn, &block, &data, &io_service]( + sasl_conn->Handshake([sasl_conn, &block, &data, &io_service]( const Status &s) { ASSERT_TRUE(s.ok()); - ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0, + ReadContent(sasl_conn, block, kChunkSize, 0, buffer(const_cast<char *>(data.c_str()), data.size()), [&data, &io_service](const Status &stat, size_t transferred) { ASSERT_TRUE(stat.ok());
