http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt new file mode 100644 index 0000000..71e28ac --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/CMakeLists.txt @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +add_library(reader remote_block_reader.cc datatransfer.cc) +add_dependencies(reader proto)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h new file mode 100644 index 0000000..81636b9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/block_reader.h @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef BLOCK_READER_H_ +#define BLOCK_READER_H_ + +#include "libhdfspp/status.h" +#include "datatransfer.pb.h" + +#include <memory> + +namespace hdfs { + +struct CacheStrategy { + bool drop_behind_specified; + bool drop_behind; + bool read_ahead_specified; + unsigned long long read_ahead; + CacheStrategy() + : drop_behind_specified(false), drop_behind(false), + read_ahead_specified(false), read_ahead(false) {} +}; + +enum DropBehindStrategy { + kUnspecified = 0, + kEnableDropBehind = 1, + kDisableDropBehind = 2, +}; + +enum EncryptionScheme { + kNone = 0, + kAESCTRNoPadding = 1, +}; + +struct BlockReaderOptions { + bool verify_checksum; + CacheStrategy cache_strategy; + EncryptionScheme encryption_scheme; + + BlockReaderOptions() + : verify_checksum(true), encryption_scheme(EncryptionScheme::kNone) {} +}; + +template <class Stream> +class RemoteBlockReader + : public std::enable_shared_from_this<RemoteBlockReader<Stream>> { +public: + explicit RemoteBlockReader(const BlockReaderOptions &options, Stream *stream) + : stream_(stream), state_(kOpen), options_(options), + chunk_padding_bytes_(0) {} + + template <class MutableBufferSequence, class ReadHandler> + void async_read_some(const MutableBufferSequence &buffers, + const ReadHandler &handler); + + template <class MutableBufferSequence> + size_t read_some(const MutableBufferSequence &buffers, Status *status); + + Status connect(const std::string &client_name, + const hadoop::common::TokenProto *token, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, + uint64_t offset); + + template <class ConnectHandler> + void async_connect(const std::string &client_name, + const hadoop::common::TokenProto *token, + const hadoop::hdfs::ExtendedBlockProto *block, + uint64_t length, uint64_t offset, + const ConnectHandler &handler); + +private: + struct ReadPacketHeader; + struct ReadChecksum; + struct ReadPadding; + template <class MutableBufferSequence> struct ReadData; + struct AckRead; + enum State { + kOpen, + kReadPacketHeader, + kReadChecksum, + kReadPadding, + kReadData, + kFinished, + }; + + Stream *stream_; + hadoop::hdfs::PacketHeaderProto header_; + State state_; + BlockReaderOptions options_; + size_t packet_len_; + int packet_data_read_bytes_; + int chunk_padding_bytes_; + long long bytes_to_read_; + std::vector<char> checksum_; +}; +} + +#include "remote_block_reader_impl.h" + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc new file mode 100644 index 0000000..d936407 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.cc @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "datatransfer.h" + +#include "libhdfspp/status.h" + +namespace hdfs { + +namespace DataTransferSaslStreamUtil { + +static const auto kSUCCESS = hadoop::hdfs::DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS; + +using hadoop::hdfs::DataTransferEncryptorMessageProto; + +Status ConvertToStatus(const DataTransferEncryptorMessageProto *msg, std::string *payload) { + using namespace hadoop::hdfs; + auto s = msg->status(); + if (s == DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR_UNKNOWN_KEY) { + payload->clear(); + return Status::Exception("InvalidEncryptionKeyException", msg->message().c_str()); + } else if (s == DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR) { + payload->clear(); + return Status::Error(msg->message().c_str()); + } else { + *payload = msg->payload(); + return Status::OK(); + } +} + +void PrepareInitialHandshake(DataTransferEncryptorMessageProto *msg) { + msg->set_status(kSUCCESS); + msg->set_payload(""); +} + +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h new file mode 100644 index 0000000..511c2eb --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer.h @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_READER_DATA_TRANSFER_H_ +#define LIB_READER_DATA_TRANSFER_H_ + +#include "common/sasl_authenticator.h" + +namespace hdfs { + +enum { + kDataTransferVersion = 28, + kDataTransferSasl = 0xdeadbeef, +}; + +enum Operation { + kWriteBlock = 80, + kReadBlock = 81, +}; + +template <class Stream> class DataTransferSaslStream { +public: + DataTransferSaslStream(Stream *stream, const std::string &username, + const std::string &password) + : stream_(stream), authenticator_(username, password) {} + + template <class Handler> void Handshake(const Handler &next); + + template <class MutableBufferSequence, class ReadHandler> + void async_read_some(const MutableBufferSequence &buffers, + ReadHandler &&handler); + + template <class ConstBufferSequence, class WriteHandler> + void async_write_some(const ConstBufferSequence &buffers, + WriteHandler &&handler); + +private: + DataTransferSaslStream(const DataTransferSaslStream &) = delete; + DataTransferSaslStream &operator=(const DataTransferSaslStream &) = delete; + Stream *stream_; + DigestMD5Authenticator authenticator_; + struct ReadSaslMessage; + struct Authenticator; +}; +} + +#include "datatransfer_impl.h" + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h new file mode 100644 index 0000000..088b86e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/datatransfer_impl.h @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_READER_DATATRANFER_IMPL_H_ +#define LIB_READER_DATATRANFER_IMPL_H_ + +#include "datatransfer.pb.h" +#include "common/continuation/continuation.h" +#include "common/continuation/asio.h" +#include "common/continuation/protobuf.h" + +#include <asio/read.hpp> +#include <asio/buffer.hpp> + +namespace hdfs { + +namespace DataTransferSaslStreamUtil { +Status +ConvertToStatus(const ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg, + std::string *payload); +void PrepareInitialHandshake( + ::hadoop::hdfs::DataTransferEncryptorMessageProto *msg); +} + +template <class Stream> +struct DataTransferSaslStream<Stream>::Authenticator + : continuation::Continuation { + Authenticator(DigestMD5Authenticator *authenticator, + const std::string *request, + hadoop::hdfs::DataTransferEncryptorMessageProto *msg) + : authenticator_(authenticator), request_(request), msg_(msg) {} + + virtual void Run(const Next &next) override { + using namespace ::hadoop::hdfs; + std::string response; + Status status = authenticator_->EvaluateResponse(*request_, &response); + msg_->Clear(); + if (status.ok()) { + // TODO: Handle encryption scheme + msg_->set_payload(response); + msg_->set_status( + DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS); + } else { + msg_->set_status( + DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_ERROR); + } + next(Status::OK()); + } + +private: + DigestMD5Authenticator *authenticator_; + const std::string *request_; + hadoop::hdfs::DataTransferEncryptorMessageProto *msg_; +}; + +template <class Stream> +struct DataTransferSaslStream<Stream>::ReadSaslMessage + : continuation::Continuation { + ReadSaslMessage(Stream *stream, std::string *data) + : stream_(stream), data_(data), read_pb_(stream, &resp_) {} + + virtual void Run(const Next &next) override { + auto handler = [this, next](const Status &status) { + if (status.ok()) { + Status new_stat = + DataTransferSaslStreamUtil::ConvertToStatus(&resp_, data_); + next(new_stat); + } else { + next(status); + } + }; + read_pb_.Run(handler); + } + +private: + Stream *stream_; + std::string *data_; + hadoop::hdfs::DataTransferEncryptorMessageProto resp_; + continuation::ReadDelimitedPBMessageContinuation<Stream, 1024> read_pb_; +}; + +template <class Stream> +template <class Handler> +void DataTransferSaslStream<Stream>::Handshake(const Handler &next) { + using ::hadoop::hdfs::DataTransferEncryptorMessageProto; + using ::hdfs::continuation::Write; + using ::hdfs::continuation::WriteDelimitedPBMessage; + + static const int kMagicNumber = htonl(kDataTransferSasl); + static const asio::const_buffers_1 kMagicNumberBuffer = asio::buffer( + reinterpret_cast<const char *>(kMagicNumber), sizeof(kMagicNumber)); + + struct State { + DataTransferEncryptorMessageProto req0; + std::string resp0; + DataTransferEncryptorMessageProto req1; + std::string resp1; + Stream *stream; + }; + auto m = continuation::Pipeline<State>::Create(); + State *s = &m->state(); + s->stream = stream_; + + DataTransferSaslStreamUtil::PrepareInitialHandshake(&s->req0); + + m->Push(Write(stream_, kMagicNumberBuffer)) + .Push(WriteDelimitedPBMessage(stream_, &s->req0)) + .Push(new ReadSaslMessage(stream_, &s->resp0)) + .Push(new Authenticator(&authenticator_, &s->resp0, &s->req1)) + .Push(WriteDelimitedPBMessage(stream_, &s->req1)) + .Push(new ReadSaslMessage(stream_, &s->resp1)); + m->Run([next](const Status &status, const State &) { next(status); }); +} + +template <class Stream> +template <class MutableBufferSequence, class ReadHandler> +void DataTransferSaslStream<Stream>::async_read_some( + const MutableBufferSequence &buffers, ReadHandler &&handler) { + stream_->async_read_some(buffers, handler); +} + +template <class Stream> +template <typename ConstBufferSequence, typename WriteHandler> +void DataTransferSaslStream<Stream>::async_write_some( + const ConstBufferSequence &buffers, WriteHandler &&handler) { + stream_->async_write_some(buffers, handler); +} +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc new file mode 100644 index 0000000..68bc4ee --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader.cc @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "block_reader.h" + +namespace hdfs { + +hadoop::hdfs::OpReadBlockProto +ReadBlockProto(const std::string &client_name, bool verify_checksum, + const hadoop::common::TokenProto *token, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, + uint64_t offset) { + using namespace hadoop::hdfs; + using namespace hadoop::common; + BaseHeaderProto *base_h = new BaseHeaderProto(); + base_h->set_allocated_block(new ExtendedBlockProto(*block)); + if (token) { + base_h->set_allocated_token(new TokenProto(*token)); + } + ClientOperationHeaderProto *h = new ClientOperationHeaderProto(); + h->set_clientname(client_name); + h->set_allocated_baseheader(base_h); + + OpReadBlockProto p; + p.set_allocated_header(h); + p.set_offset(offset); + p.set_len(length); + p.set_sendchecksums(verify_checksum); + // TODO: p.set_allocated_cachingstrategy(); + return p; +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h new file mode 100644 index 0000000..68ea6ad --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/reader/remote_block_reader_impl.h @@ -0,0 +1,342 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_ +#define LIBHDFSPP_READER_REMOTE_BLOCK_READER_IMPL_H_ + +#include "datatransfer.h" +#include "common/continuation/asio.h" +#include "common/continuation/protobuf.h" + +#include <asio/buffers_iterator.hpp> +#include <asio/streambuf.hpp> +#include <asio/write.hpp> + +#include <arpa/inet.h> + +#include <future> + +namespace hdfs { + +hadoop::hdfs::OpReadBlockProto +ReadBlockProto(const std::string &client_name, bool verify_checksum, + const hadoop::common::TokenProto *token, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, + uint64_t offset); + +template <class Stream> +template <class ConnectHandler> +void RemoteBlockReader<Stream>::async_connect( + const std::string &client_name, const hadoop::common::TokenProto *token, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, + uint64_t offset, const ConnectHandler &handler) { + // The total number of bytes that we need to transfer from the DN is + // the amount that the user wants (bytesToRead), plus the padding at + // the beginning in order to chunk-align. Note that the DN may elect + // to send more than this amount if the read starts/ends mid-chunk. + bytes_to_read_ = length; + + struct State { + std::string header; + hadoop::hdfs::OpReadBlockProto request; + hadoop::hdfs::BlockOpResponseProto response; + }; + + auto m = continuation::Pipeline<State>::Create(); + State *s = &m->state(); + + s->header.insert(s->header.begin(), + {0, kDataTransferVersion, Operation::kReadBlock}); + s->request = std::move(ReadBlockProto(client_name, options_.verify_checksum, + token, block, length, offset)); + + auto read_pb_message = + new continuation::ReadDelimitedPBMessageContinuation<Stream, 16384>( + stream_, &s->response); + + m->Push(continuation::Write(stream_, asio::buffer(s->header))) + .Push(continuation::WriteDelimitedPBMessage(stream_, &s->request)) + .Push(read_pb_message); + + m->Run([this, handler, offset](const Status &status, const State &s) { + Status stat = status; + if (stat.ok()) { + const auto &resp = s.response; + if (resp.status() == ::hadoop::hdfs::Status::SUCCESS) { + if (resp.has_readopchecksuminfo()) { + const auto &checksum_info = resp.readopchecksuminfo(); + chunk_padding_bytes_ = offset - checksum_info.chunkoffset(); + } + state_ = kReadPacketHeader; + } else { + stat = Status::Error(s.response.message().c_str()); + } + } + handler(stat); + }); +} + +template <class Stream> +struct RemoteBlockReader<Stream>::ReadPacketHeader + : continuation::Continuation { + ReadPacketHeader(RemoteBlockReader<Stream> *parent) : parent_(parent) {} + + virtual void Run(const Next &next) override { + parent_->packet_data_read_bytes_ = 0; + parent_->packet_len_ = 0; + auto handler = [next, this](const asio::error_code &ec, size_t) { + Status status; + if (ec) { + status = Status(ec.value(), ec.message().c_str()); + } else { + parent_->packet_len_ = packet_length(); + parent_->header_.Clear(); + bool v = parent_->header_.ParseFromArray(&buf_[kHeaderStart], + header_length()); + assert(v && "Failed to parse the header"); + parent_->state_ = kReadChecksum; + } + next(status); + }; + + asio::async_read(*parent_->stream_, asio::buffer(buf_), + std::bind(&ReadPacketHeader::CompletionHandler, this, + std::placeholders::_1, std::placeholders::_2), + handler); + } + +private: + static const size_t kMaxHeaderSize = 512; + static const size_t kPayloadLenOffset = 0; + static const size_t kPayloadLenSize = sizeof(int); + static const size_t kHeaderLenOffset = 4; + static const size_t kHeaderLenSize = sizeof(short); + static const size_t kHeaderStart = kPayloadLenSize + kHeaderLenSize; + + RemoteBlockReader<Stream> *parent_; + std::array<char, kMaxHeaderSize> buf_; + + size_t packet_length() const { + return ntohl(*reinterpret_cast<const unsigned *>(&buf_[kPayloadLenOffset])); + } + + size_t header_length() const { + return ntohs(*reinterpret_cast<const short *>(&buf_[kHeaderLenOffset])); + } + + size_t CompletionHandler(const asio::error_code &ec, size_t transferred) { + if (ec) { + return 0; + } else if (transferred < kHeaderStart) { + return kHeaderStart - transferred; + } else { + return kHeaderStart + header_length() - transferred; + } + } +}; + +template <class Stream> +struct RemoteBlockReader<Stream>::ReadChecksum : continuation::Continuation { + ReadChecksum(RemoteBlockReader<Stream> *parent) : parent_(parent) {} + + virtual void Run(const Next &next) override { + auto parent = parent_; + if (parent->state_ != kReadChecksum) { + next(Status::OK()); + return; + } + + auto handler = [parent, next](const asio::error_code &ec, size_t) { + Status status; + if (ec) { + status = Status(ec.value(), ec.message().c_str()); + } else { + parent->state_ = + parent->chunk_padding_bytes_ ? kReadPadding : kReadData; + } + next(status); + }; + parent->checksum_.resize(parent->packet_len_ - sizeof(int) - + parent->header_.datalen()); + asio::async_read(*parent->stream_, asio::buffer(parent->checksum_), + handler); + } + +private: + RemoteBlockReader<Stream> *parent_; +}; + +template <class Stream> +struct RemoteBlockReader<Stream>::ReadPadding : continuation::Continuation { + ReadPadding(RemoteBlockReader<Stream> *parent) + : parent_(parent), padding_(parent->chunk_padding_bytes_), + bytes_transferred_(std::make_shared<size_t>(0)), + read_data_(new ReadData<asio::mutable_buffers_1>( + parent, bytes_transferred_, asio::buffer(padding_))) {} + + virtual void Run(const Next &next) override { + if (parent_->state_ != kReadPadding || !parent_->chunk_padding_bytes_) { + next(Status::OK()); + return; + } + + auto h = [next, this](const Status &status) { + if (status.ok()) { + assert(reinterpret_cast<const int &>(*bytes_transferred_) == + parent_->chunk_padding_bytes_); + parent_->chunk_padding_bytes_ = 0; + parent_->state_ = kReadData; + } + next(status); + }; + read_data_->Run(h); + } + +private: + RemoteBlockReader<Stream> *parent_; + std::vector<char> padding_; + std::shared_ptr<size_t> bytes_transferred_; + std::shared_ptr<continuation::Continuation> read_data_; + ReadPadding(const ReadPadding &) = delete; + ReadPadding &operator=(const ReadPadding &) = delete; +}; + +template <class Stream> +template <class MutableBufferSequence> +struct RemoteBlockReader<Stream>::ReadData : continuation::Continuation { + ReadData(RemoteBlockReader<Stream> *parent, + std::shared_ptr<size_t> bytes_transferred, + const MutableBufferSequence &buf) + : parent_(parent), bytes_transferred_(bytes_transferred), buf_(buf) {} + + virtual void Run(const Next &next) override { + auto handler = + [next, this](const asio::error_code &ec, size_t transferred) { + Status status; + if (ec) { + status = Status(ec.value(), ec.message().c_str()); + } + *bytes_transferred_ += transferred; + parent_->bytes_to_read_ -= transferred; + parent_->packet_data_read_bytes_ += transferred; + if (parent_->packet_data_read_bytes_ >= parent_->header_.datalen()) { + parent_->state_ = kReadPacketHeader; + } + next(status); + }; + + auto data_len = + parent_->header_.datalen() - parent_->packet_data_read_bytes_; + async_read(*parent_->stream_, buf_, asio::transfer_exactly(data_len), + handler); + } + +private: + RemoteBlockReader<Stream> *parent_; + std::shared_ptr<size_t> bytes_transferred_; + MutableBufferSequence buf_; +}; + +template <class Stream> +struct RemoteBlockReader<Stream>::AckRead : continuation::Continuation { + AckRead(RemoteBlockReader<Stream> *parent) : parent_(parent) {} + + virtual void Run(const Next &next) override { + if (parent_->bytes_to_read_ > 0) { + next(Status::OK()); + return; + } + + auto m = + continuation::Pipeline<hadoop::hdfs::ClientReadStatusProto>::Create(); + m->state().set_status(parent_->options_.verify_checksum + ? hadoop::hdfs::Status::CHECKSUM_OK + : hadoop::hdfs::Status::SUCCESS); + + m->Push( + continuation::WriteDelimitedPBMessage(parent_->stream_, &m->state())); + + m->Run([this, next](const Status &status, + const hadoop::hdfs::ClientReadStatusProto &) { + if (status.ok()) { + parent_->state_ = RemoteBlockReader<Stream>::kFinished; + } + next(status); + }); + } + +private: + RemoteBlockReader<Stream> *parent_; +}; + +template <class Stream> +template <class MutableBufferSequence, class ReadHandler> +void RemoteBlockReader<Stream>::async_read_some( + const MutableBufferSequence &buffers, const ReadHandler &handler) { + assert(state_ != kOpen && "Not connected"); + + struct State { + std::shared_ptr<size_t> bytes_transferred; + }; + auto m = continuation::Pipeline<State>::Create(); + m->state().bytes_transferred = std::make_shared<size_t>(0); + + m->Push(new ReadPacketHeader(this)) + .Push(new ReadChecksum(this)) + .Push(new ReadPadding(this)) + .Push(new ReadData<MutableBufferSequence>( + this, m->state().bytes_transferred, buffers)) + .Push(new AckRead(this)); + + auto self = this->shared_from_this(); + m->Run([self, handler](const Status &status, const State &state) { + handler(status, *state.bytes_transferred); + }); +} + +template <class Stream> +template <class MutableBufferSequence> +size_t +RemoteBlockReader<Stream>::read_some(const MutableBufferSequence &buffers, + Status *status) { + size_t transferred = 0; + auto done = std::make_shared<std::promise<void>>(); + auto future = done->get_future(); + async_read_some(buffers, + [status, &transferred, done](const Status &stat, size_t t) { + *status = stat; + transferred = t; + done->set_value(); + }); + future.wait(); + return transferred; +} + +template <class Stream> +Status RemoteBlockReader<Stream>::connect( + const std::string &client_name, const hadoop::common::TokenProto *token, + const hadoop::hdfs::ExtendedBlockProto *block, uint64_t length, + uint64_t offset) { + auto stat = std::make_shared<std::promise<Status>>(); + std::future<Status> future(stat->get_future()); + async_connect(client_name, token, block, length, offset, + [stat](const Status &status) { stat->set_value(status); }); + return future.get(); +} +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt new file mode 100644 index 0000000..aa3951c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/CMakeLists.txt @@ -0,0 +1,3 @@ +include_directories(${OPENSSL_INCLUDE_DIRS}) +add_library(rpc rpc_connection.cc rpc_engine.cc) +add_dependencies(rpc proto) http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc new file mode 100644 index 0000000..8c4130f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "rpc_engine.h" + +#include "RpcHeader.pb.h" +#include "ProtobufRpcEngine.pb.h" +#include "IpcConnectionContext.pb.h" + +#include "common/logging.h" +#include "common/util.h" + +#include <asio/read.hpp> + +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> + +namespace hdfs { + +namespace pb = ::google::protobuf; +namespace pbio = ::google::protobuf::io; + +using namespace ::hadoop::common; +using namespace ::std::placeholders; + +static void +ConstructPacket(std::string *res, + std::initializer_list<const pb::MessageLite *> headers, + const std::string *request) { + int len = 0; + std::for_each( + headers.begin(), headers.end(), + [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); }); + if (request) { + len += pbio::CodedOutputStream::VarintSize32(request->size()) + + request->size(); + } + + int net_len = htonl(len); + res->reserve(res->size() + sizeof(net_len) + len); + + pbio::StringOutputStream ss(res); + pbio::CodedOutputStream os(&ss); + os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len)); + + uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); + + std::for_each( + headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) { + buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf); + buf = v->SerializeWithCachedSizesToArray(buf); + }); + + if (request) { + buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf); + buf = os.WriteStringToArray(*request, buf); + } +} + +static void SetRequestHeader(RpcEngine *engine, int call_id, + const std::string &method_name, + RpcRequestHeaderProto *rpc_header, + RequestHeaderProto *req_header) { + rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER); + rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); + rpc_header->set_callid(call_id); + rpc_header->set_clientid(engine->client_name()); + + req_header->set_methodname(method_name); + req_header->set_declaringclassprotocolname(engine->protocol_name()); + req_header->set_clientprotocolversion(engine->protocol_version()); +} + +RpcConnection::~RpcConnection() {} + +RpcConnection::Request::Request(RpcConnection *parent, + const std::string &method_name, + const std::string &request, Handler &&handler) + : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()), + handler_(std::move(handler)) { + RpcRequestHeaderProto rpc_header; + RequestHeaderProto req_header; + SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header, + &req_header); + ConstructPacket(&payload_, {&rpc_header, &req_header}, &request); +} + +RpcConnection::Request::Request(RpcConnection *parent, + const std::string &method_name, + const pb::MessageLite *request, + Handler &&handler) + : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()), + handler_(std::move(handler)) { + RpcRequestHeaderProto rpc_header; + RequestHeaderProto req_header; + SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header, + &req_header); + ConstructPacket(&payload_, {&rpc_header, &req_header, request}, nullptr); +} + +void RpcConnection::Request::OnResponseArrived(pbio::CodedInputStream *is, + const Status &status) { + handler_(is, status); +} + +RpcConnection::RpcConnection(RpcEngine *engine) + : engine_(engine), resp_state_(kReadLength), resp_length_(0) {} + +::asio::io_service &RpcConnection::io_service() { + return engine_->io_service(); +} + +void RpcConnection::Start() { + io_service().post(std::bind(&RpcConnection::OnRecvCompleted, this, + ::asio::error_code(), 0)); +} + +void RpcConnection::FlushPendingRequests() { + io_service().post([this]() { + if (!request_over_the_wire_) { + OnSendCompleted(::asio::error_code(), 0); + } + }); +} + +void RpcConnection::HandleRpcResponse(const std::vector<char> &data) { + /* assumed to be called from a context that has already acquired the + * engine_state_lock */ + pbio::ArrayInputStream ar(&data[0], data.size()); + pbio::CodedInputStream in(&ar); + in.PushLimit(data.size()); + RpcResponseHeaderProto h; + ReadDelimitedPBMessage(&in, &h); + + auto req = RemoveFromRunningQueue(h.callid()); + if (!req) { + LOG_WARN() << "RPC response with Unknown call id " << h.callid(); + return; + } + + Status stat; + if (h.has_exceptionclassname()) { + stat = + Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); + } + req->OnResponseArrived(&in, stat); +} + +void RpcConnection::HandleRpcTimeout(std::shared_ptr<Request> req, + const ::asio::error_code &ec) { + if (ec.value() == asio::error::operation_aborted) { + return; + } + + std::lock_guard<std::mutex> state_lock(engine_state_lock_); + auto r = RemoveFromRunningQueue(req->call_id()); + if (!r) { + // The RPC might have been finished and removed from the queue + return; + } + + Status stat = ToStatus(ec ? ec : make_error_code(::asio::error::timed_out)); + + r->OnResponseArrived(nullptr, stat); +} + +std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() { + static const char kHandshakeHeader[] = {'h', 'r', 'p', 'c', + RpcEngine::kRpcVersion, 0, 0}; + auto res = + std::make_shared<std::string>(kHandshakeHeader, sizeof(kHandshakeHeader)); + + RpcRequestHeaderProto h; + h.set_rpckind(RPC_PROTOCOL_BUFFER); + h.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); + h.set_callid(RpcEngine::kCallIdConnectionContext); + h.set_clientid(engine_->client_name()); + + IpcConnectionContextProto handshake; + handshake.set_protocol(engine_->protocol_name()); + ConstructPacket(res.get(), {&h, &handshake}, nullptr); + return res; +} + +void RpcConnection::AsyncRpc( + const std::string &method_name, const ::google::protobuf::MessageLite *req, + std::shared_ptr<::google::protobuf::MessageLite> resp, + const Callback &handler) { + std::lock_guard<std::mutex> state_lock(engine_state_lock_); + + auto wrapped_handler = + [resp, handler](pbio::CodedInputStream *is, const Status &status) { + if (status.ok()) { + ReadDelimitedPBMessage(is, resp.get()); + } + handler(status); + }; + + auto r = std::make_shared<Request>(this, method_name, req, + std::move(wrapped_handler)); + pending_requests_.push_back(r); + FlushPendingRequests(); +} + +void RpcConnection::AsyncRawRpc(const std::string &method_name, + const std::string &req, + std::shared_ptr<std::string> resp, + Callback &&handler) { + std::lock_guard<std::mutex> state_lock(engine_state_lock_); + + auto wrapped_handler = + [this, resp, handler](pbio::CodedInputStream *is, const Status &status) { + if (status.ok()) { + uint32_t size = 0; + is->ReadVarint32(&size); + auto limit = is->PushLimit(size); + is->ReadString(resp.get(), limit); + is->PopLimit(limit); + } + handler(status); + }; + + auto r = std::make_shared<Request>(this, method_name, req, + std::move(wrapped_handler)); + pending_requests_.push_back(r); + FlushPendingRequests(); +} + +void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) { + Shutdown(); + std::vector<std::shared_ptr<Request>> requests; + std::transform(requests_on_fly_.begin(), requests_on_fly_.end(), + std::back_inserter(requests), + std::bind(&RequestOnFlyMap::value_type::second, _1)); + requests_on_fly_.clear(); + requests.insert(requests.end(), + std::make_move_iterator(pending_requests_.begin()), + std::make_move_iterator(pending_requests_.end())); + pending_requests_.clear(); + for (const auto &req : requests) { + req->OnResponseArrived(nullptr, ToStatus(ec)); + } +} + +std::shared_ptr<RpcConnection::Request> +RpcConnection::RemoveFromRunningQueue(int call_id) { + auto it = requests_on_fly_.find(call_id); + if (it == requests_on_fly_.end()) { + return std::shared_ptr<Request>(); + } + + auto req = it->second; + requests_on_fly_.erase(it); + return req; +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h new file mode 100644 index 0000000..439a730 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_RPC_RPC_CONNECTION_H_ +#define LIB_RPC_RPC_CONNECTION_H_ + +#include "rpc_engine.h" + +#include "common/logging.h" +#include "common/util.h" + +#include <asio/connect.hpp> +#include <asio/read.hpp> +#include <asio/write.hpp> + +namespace hdfs { + +template <class NextLayer> class RpcConnectionImpl : public RpcConnection { +public: + RpcConnectionImpl(RpcEngine *engine); + virtual void Connect(const ::asio::ip::tcp::endpoint &server, + Callback &&handler) override; + virtual void Handshake(Callback &&handler) override; + virtual void Shutdown() override; + virtual void OnSendCompleted(const ::asio::error_code &ec, + size_t transferred) override; + virtual void OnRecvCompleted(const ::asio::error_code &ec, + size_t transferred) override; + + NextLayer &next_layer() { return next_layer_; } +private: + const Options options_; + NextLayer next_layer_; +}; + +template <class NextLayer> +RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine) + : RpcConnection(engine), options_(engine->options()), + next_layer_(engine->io_service()) {} + +template <class NextLayer> +void RpcConnectionImpl<NextLayer>::Connect( + const ::asio::ip::tcp::endpoint &server, Callback &&handler) { + next_layer_.async_connect(server, + [handler](const ::asio::error_code &ec) { + handler(ToStatus(ec)); + }); +} + +template <class NextLayer> +void RpcConnectionImpl<NextLayer>::Handshake(Callback &&handler) { + auto handshake_packet = PrepareHandshakePacket(); + ::asio::async_write( + next_layer_, asio::buffer(*handshake_packet), + [handshake_packet, handler](const ::asio::error_code &ec, size_t) { + handler(ToStatus(ec)); + }); +} + +template <class NextLayer> +void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec, + size_t) { + using std::placeholders::_1; + using std::placeholders::_2; + std::lock_guard<std::mutex> state_lock(engine_state_lock_); + + request_over_the_wire_.reset(); + if (ec) { + // Current RPC has failed -- abandon the + // connection and do proper clean up + ClearAndDisconnect(ec); + return; + } + + if (!pending_requests_.size()) { + return; + } + + std::shared_ptr<Request> req = pending_requests_.front(); + pending_requests_.erase(pending_requests_.begin()); + requests_on_fly_[req->call_id()] = req; + request_over_the_wire_ = req; + + req->timer().expires_from_now( + std::chrono::milliseconds(options_.rpc_timeout)); + req->timer().async_wait(std::bind( + &RpcConnectionImpl<NextLayer>::HandleRpcTimeout, this, req, _1)); + + asio::async_write( + next_layer_, asio::buffer(req->payload()), + std::bind(&RpcConnectionImpl<NextLayer>::OnSendCompleted, this, _1, _2)); +} + +template <class NextLayer> +void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ec, + size_t) { + using std::placeholders::_1; + using std::placeholders::_2; + std::lock_guard<std::mutex> state_lock(engine_state_lock_); + + switch (ec.value()) { + case 0: + // No errors + break; + case asio::error::operation_aborted: + // The event loop has been shut down. Ignore the error. + return; + default: + LOG_WARN() << "Network error during RPC: " << ec.message(); + ClearAndDisconnect(ec); + return; + } + + if (resp_state_ == kReadLength) { + resp_state_ = kReadContent; + auto buf = ::asio::buffer(reinterpret_cast<char *>(&resp_length_), + sizeof(resp_length_)); + asio::async_read(next_layer_, buf, + std::bind(&RpcConnectionImpl<NextLayer>::OnRecvCompleted, + this, _1, _2)); + + } else if (resp_state_ == kReadContent) { + resp_state_ = kParseResponse; + resp_length_ = ntohl(resp_length_); + resp_data_.resize(resp_length_); + asio::async_read(next_layer_, ::asio::buffer(resp_data_), + std::bind(&RpcConnectionImpl<NextLayer>::OnRecvCompleted, + this, _1, _2)); + + } else if (resp_state_ == kParseResponse) { + resp_state_ = kReadLength; + HandleRpcResponse(resp_data_); + resp_data_.clear(); + Start(); + } +} + +template <class NextLayer> void RpcConnectionImpl<NextLayer>::Shutdown() { + next_layer_.cancel(); + next_layer_.close(); +} +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc new file mode 100644 index 0000000..83721a7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "rpc_engine.h" +#include "rpc_connection.h" +#include "common/util.h" + +#include <openssl/rand.h> + +#include <sstream> +#include <future> + +namespace hdfs { + +RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options, + const std::string &client_name, const char *protocol_name, + int protocol_version) + : io_service_(io_service), options_(options), client_name_(client_name), + protocol_name_(protocol_name), protocol_version_(protocol_version), + call_id_(0) { +} + +void RpcEngine::Connect(const ::asio::ip::tcp::endpoint &server, + const std::function<void(const Status &)> &handler) { + conn_.reset(new RpcConnectionImpl<::asio::ip::tcp::socket>(this)); + conn_->Connect(server, [this, handler](const Status &stat) { + if (!stat.ok()) { + handler(stat); + } else { + conn_->Handshake([handler](const Status &s) { handler(s); }); + } + }); +} + +void RpcEngine::Start() { conn_->Start(); } + +void RpcEngine::Shutdown() { + io_service_->post([this]() { conn_->Shutdown(); }); +} + +void RpcEngine::TEST_SetRpcConnection(std::unique_ptr<RpcConnection> *conn) { + conn_.reset(conn->release()); +} + +void RpcEngine::AsyncRpc( + const std::string &method_name, const ::google::protobuf::MessageLite *req, + const std::shared_ptr<::google::protobuf::MessageLite> &resp, + const std::function<void(const Status &)> &handler) { + conn_->AsyncRpc(method_name, req, resp, handler); +} + +Status +RpcEngine::Rpc(const std::string &method_name, + const ::google::protobuf::MessageLite *req, + const std::shared_ptr<::google::protobuf::MessageLite> &resp) { + auto stat = std::make_shared<std::promise<Status>>(); + std::future<Status> future(stat->get_future()); + AsyncRpc(method_name, req, resp, + [stat](const Status &status) { stat->set_value(status); }); + return future.get(); +} + +Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req, + std::shared_ptr<std::string> resp) { + auto stat = std::make_shared<std::promise<Status>>(); + std::future<Status> future(stat->get_future()); + conn_->AsyncRawRpc(method_name, req, resp, + [stat](const Status &status) { stat->set_value(status); }); + return future.get(); +} + +std::string RpcEngine::GetRandomClientName() { + unsigned char buf[6] = { + 0, + }; + RAND_pseudo_bytes(buf, sizeof(buf)); + + std::stringstream ss; + ss << "libhdfs++_" + << Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf))); + return ss.str(); +} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h new file mode 100644 index 0000000..ee04fd5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIB_RPC_RPC_ENGINE_H_ +#define LIB_RPC_RPC_ENGINE_H_ + +#include "libhdfspp/options.h" +#include "libhdfspp/status.h" + +#include <google/protobuf/message_lite.h> + +#include <asio/ip/tcp.hpp> +#include <asio/deadline_timer.hpp> + +#include <atomic> +#include <memory> +#include <unordered_map> +#include <vector> +#include <mutex> + +namespace hdfs { + +class RpcEngine; +class RpcConnection { +public: + typedef std::function<void(const Status &)> Callback; + virtual ~RpcConnection(); + RpcConnection(RpcEngine *engine); + virtual void Connect(const ::asio::ip::tcp::endpoint &server, + Callback &&handler) = 0; + virtual void Handshake(Callback &&handler) = 0; + virtual void Shutdown() = 0; + + void Start(); + void AsyncRpc(const std::string &method_name, + const ::google::protobuf::MessageLite *req, + std::shared_ptr<::google::protobuf::MessageLite> resp, + const Callback &handler); + + void AsyncRawRpc(const std::string &method_name, const std::string &request, + std::shared_ptr<std::string> resp, Callback &&handler); + +protected: + class Request; + RpcEngine *const engine_; + virtual void OnSendCompleted(const ::asio::error_code &ec, + size_t transferred) = 0; + virtual void OnRecvCompleted(const ::asio::error_code &ec, + size_t transferred) = 0; + + ::asio::io_service &io_service(); + std::shared_ptr<std::string> PrepareHandshakePacket(); + static std::string + SerializeRpcRequest(const std::string &method_name, + const ::google::protobuf::MessageLite *req); + void HandleRpcResponse(const std::vector<char> &data); + void HandleRpcTimeout(std::shared_ptr<Request> req, + const ::asio::error_code &ec); + void FlushPendingRequests(); + void ClearAndDisconnect(const ::asio::error_code &ec); + std::shared_ptr<Request> RemoveFromRunningQueue(int call_id); + + enum ResponseState { + kReadLength, + kReadContent, + kParseResponse, + } resp_state_; + unsigned resp_length_; + std::vector<char> resp_data_; + + class Request { + public: + typedef std::function<void(::google::protobuf::io::CodedInputStream *is, + const Status &status)> Handler; + Request(RpcConnection *parent, const std::string &method_name, + const std::string &request, Handler &&callback); + Request(RpcConnection *parent, const std::string &method_name, + const ::google::protobuf::MessageLite *request, Handler &&callback); + + int call_id() const { return call_id_; } + ::asio::deadline_timer &timer() { return timer_; } + const std::string &payload() const { return payload_; } + void OnResponseArrived(::google::protobuf::io::CodedInputStream *is, + const Status &status); + + private: + const int call_id_; + ::asio::deadline_timer timer_; + std::string payload_; + Handler handler_; + }; + + // The request being sent over the wire + std::shared_ptr<Request> request_over_the_wire_; + // Requests to be sent over the wire + std::vector<std::shared_ptr<Request>> pending_requests_; + // Requests that are waiting for responses + typedef std::unordered_map<int, std::shared_ptr<Request>> RequestOnFlyMap; + RequestOnFlyMap requests_on_fly_; + // Lock for mutable parts of this class that need to be thread safe + std::mutex engine_state_lock_; +}; + +class RpcEngine { +public: + enum { kRpcVersion = 9 }; + enum { + kCallIdAuthorizationFailed = -1, + kCallIdInvalid = -2, + kCallIdConnectionContext = -3, + kCallIdPing = -4 + }; + + RpcEngine(::asio::io_service *io_service, const Options &options, + const std::string &client_name, const char *protocol_name, + int protocol_version); + + void AsyncRpc(const std::string &method_name, + const ::google::protobuf::MessageLite *req, + const std::shared_ptr<::google::protobuf::MessageLite> &resp, + const std::function<void(const Status &)> &handler); + + Status Rpc(const std::string &method_name, + const ::google::protobuf::MessageLite *req, + const std::shared_ptr<::google::protobuf::MessageLite> &resp); + /** + * Send raw bytes as RPC payload. This is intended to be used in JNI + * bindings only. + **/ + Status RawRpc(const std::string &method_name, const std::string &req, + std::shared_ptr<std::string> resp); + void Connect(const ::asio::ip::tcp::endpoint &server, + const std::function<void(const Status &)> &handler); + void Start(); + void Shutdown(); + void TEST_SetRpcConnection(std::unique_ptr<RpcConnection> *conn); + + int NextCallId() { return ++call_id_; } + + const std::string &client_name() const { return client_name_; } + const std::string &protocol_name() const { return protocol_name_; } + int protocol_version() const { return protocol_version_; } + ::asio::io_service &io_service() { return *io_service_; } + const Options &options() { return options_; } + static std::string GetRandomClientName(); + +private: + ::asio::io_service *io_service_; + Options options_; + const std::string client_name_; + const std::string protocol_name_; + const int protocol_version_; + std::atomic_int call_id_; + std::unique_ptr<RpcConnection> conn_; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt new file mode 100644 index 0000000..eca878e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/CMakeLists.txt @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +add_library(test_common OBJECT mock_connection.cc) + +set(PROTOBUF_IMPORT_DIRS ${PROTO_HADOOP_TEST_DIR}) + +protobuf_generate_cpp(PROTO_TEST_SRCS PROTO_TEST_HDRS + ${PROTO_HADOOP_TEST_DIR}/test.proto + ${PROTO_HADOOP_TEST_DIR}/test_rpc_service.proto +) + +add_executable(remote_block_reader_test remote_block_reader_test.cc $<TARGET_OBJECTS:test_common>) +target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) +add_test(remote_block_reader remote_block_reader_test) + +add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc) +target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) +add_test(sasl_digest_md5 sasl_digest_md5_test) + +add_executable(inputstream_test inputstream_test.cc) +target_link_libraries(inputstream_test fs rpc reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) +add_test(inputstream inputstream_test) + +include_directories(${CMAKE_CURRENT_BINARY_DIR}) +add_executable(rpc_engine_test rpc_engine_test.cc ${PROTO_TEST_SRCS} ${PROTO_TEST_HDRS} $<TARGET_OBJECTS:test_common>) +target_link_libraries(rpc_engine_test rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) +add_test(rpc_engine rpc_engine_test) http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc new file mode 100644 index 0000000..aa95256 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/inputstream_test.cc @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "fs/filesystem.h" +#include <gmock/gmock.h> + +using hadoop::common::TokenProto; +using hadoop::hdfs::DatanodeInfoProto; +using hadoop::hdfs::DatanodeIDProto; +using hadoop::hdfs::ExtendedBlockProto; +using hadoop::hdfs::LocatedBlockProto; +using hadoop::hdfs::LocatedBlocksProto; + +using ::testing::_; +using ::testing::InvokeArgument; +using ::testing::Return; + +using namespace hdfs; + +namespace hdfs { + +class MockReader { +public: + virtual ~MockReader() {} + MOCK_METHOD2( + async_read_some, + void(const asio::mutable_buffers_1 &, + const std::function<void(const Status &, size_t transferred)> &)); + + MOCK_METHOD6(async_connect, + void(const std::string &, TokenProto *, ExtendedBlockProto *, + uint64_t, uint64_t, + const std::function<void(const Status &)> &)); +}; + +template <class Trait> struct MockBlockReaderTrait { + typedef MockReader Reader; + struct State { + MockReader reader_; + size_t transferred_; + Reader *reader() { return &reader_; } + size_t *transferred() { return &transferred_; } + const size_t *transferred() const { return &transferred_; } + }; + + static continuation::Pipeline<State> * + CreatePipeline(::asio::io_service *, const DatanodeInfoProto &) { + auto m = continuation::Pipeline<State>::Create(); + *m->state().transferred() = 0; + Trait::InitializeMockReader(m->state().reader()); + return m; + } +}; +} + +TEST(InputStreamTest, TestReadSingleTrunk) { + LocatedBlocksProto blocks; + LocatedBlockProto block; + DatanodeInfoProto dn; + char buf[4096] = { + 0, + }; + IoServiceImpl io_service; + Options options; + FileSystemImpl fs(&io_service, options); + InputStreamImpl is(&fs, &blocks); + Status stat; + size_t read = 0; + struct Trait { + static void InitializeMockReader(MockReader *reader) { + EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) + .WillOnce(InvokeArgument<5>(Status::OK())); + + EXPECT_CALL(*reader, async_read_some(_, _)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); + } + }; + + is.AsyncReadBlock<MockBlockReaderTrait<Trait>>( + "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, const std::string &, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(sizeof(buf), read); + read = 0; +} + +TEST(InputStreamTest, TestReadMultipleTrunk) { + LocatedBlocksProto blocks; + LocatedBlockProto block; + DatanodeInfoProto dn; + char buf[4096] = { + 0, + }; + IoServiceImpl io_service; + Options options; + FileSystemImpl fs(&io_service, options); + InputStreamImpl is(&fs, &blocks); + Status stat; + size_t read = 0; + struct Trait { + static void InitializeMockReader(MockReader *reader) { + EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) + .WillOnce(InvokeArgument<5>(Status::OK())); + + EXPECT_CALL(*reader, async_read_some(_, _)) + .Times(4) + .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)); + } + }; + + is.AsyncReadBlock<MockBlockReaderTrait<Trait>>( + "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, const std::string &, + size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(sizeof(buf), read); + read = 0; +} + +TEST(InputStreamTest, TestReadError) { + LocatedBlocksProto blocks; + LocatedBlockProto block; + DatanodeInfoProto dn; + char buf[4096] = { + 0, + }; + IoServiceImpl io_service; + Options options; + FileSystemImpl fs(&io_service, options); + InputStreamImpl is(&fs, &blocks); + Status stat; + size_t read = 0; + struct Trait { + static void InitializeMockReader(MockReader *reader) { + EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) + .WillOnce(InvokeArgument<5>(Status::OK())); + + EXPECT_CALL(*reader, async_read_some(_, _)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) + .WillOnce(InvokeArgument<1>(Status::Error("error"), 0)); + } + }; + + is.AsyncReadBlock<MockBlockReaderTrait<Trait>>( + "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), + [&stat, &read](const Status &status, const std::string &, + size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_FALSE(stat.ok()); + ASSERT_EQ(sizeof(buf) / 4 * 3, read); + read = 0; +} + +TEST(InputStreamTest, TestExcludeDataNode) { + LocatedBlocksProto blocks; + LocatedBlockProto *block = blocks.add_blocks(); + ExtendedBlockProto *b = block->mutable_b(); + b->set_poolid(""); + b->set_blockid(1); + b->set_generationstamp(1); + b->set_numbytes(4096); + + DatanodeInfoProto *di = block->add_locs(); + DatanodeIDProto *dnid = di->mutable_id(); + dnid->set_datanodeuuid("foo"); + + char buf[4096] = { + 0, + }; + IoServiceImpl io_service; + Options options; + FileSystemImpl fs(&io_service, options); + InputStreamImpl is(&fs, &blocks); + Status stat; + size_t read = 0; + struct Trait { + static void InitializeMockReader(MockReader *reader) { + EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) + .WillOnce(InvokeArgument<5>(Status::OK())); + + EXPECT_CALL(*reader, async_read_some(_, _)) + .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); + } + }; + + + std::set<std::string> excluded_dn({"foo"}); + is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), excluded_dn, + [&stat, &read](const Status &status, const std::string &, size_t transferred) { + stat = status; + read = transferred; + }); + ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again), stat.code()); + ASSERT_EQ(0UL, read); +} + +int main(int argc, char *argv[]) { + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + return RUN_ALL_TESTS(); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.cc new file mode 100644 index 0000000..93a3099 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.cc @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mock_connection.h" + +namespace hdfs { + +MockConnectionBase::MockConnectionBase(::asio::io_service *io_service) + : io_service_(io_service) +{} + +MockConnectionBase::~MockConnectionBase() {} + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h new file mode 100644 index 0000000..8c0ef8c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/mock_connection.h @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBHDFSPP_TEST_MOCK_CONNECTION_H_ +#define LIBHDFSPP_TEST_MOCK_CONNECTION_H_ + +#include <asio/error_code.hpp> +#include <asio/buffer.hpp> +#include <asio/streambuf.hpp> +#include <asio/io_service.hpp> + +#include <gmock/gmock.h> + +namespace hdfs { + +class MockConnectionBase { +public: + MockConnectionBase(::asio::io_service *io_service); + virtual ~MockConnectionBase(); + typedef std::pair<asio::error_code, std::string> ProducerResult; + template <class MutableBufferSequence, class Handler> + void async_read_some(const MutableBufferSequence &buf, Handler &&handler) { + if (produced_.size() == 0) { + ProducerResult r = Produce(); + if (r.first) { + io_service_->post(std::bind(handler, r.first, 0)); + return; + } + asio::mutable_buffers_1 data = produced_.prepare(r.second.size()); + asio::buffer_copy(data, asio::buffer(r.second)); + produced_.commit(r.second.size()); + } + + size_t len = std::min(asio::buffer_size(buf), produced_.size()); + asio::buffer_copy(buf, produced_.data()); + produced_.consume(len); + io_service_->post(std::bind(handler, asio::error_code(), len)); + } + + template <class ConstBufferSequence, class Handler> + void async_write_some(const ConstBufferSequence &buf, Handler &&handler) { + // CompletionResult res = OnWrite(buf); + io_service_->post(std::bind(handler, asio::error_code(), asio::buffer_size(buf))); + } + +protected: + virtual ProducerResult Produce() = 0; + ::asio::io_service *io_service_; + +private: + asio::streambuf produced_; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc new file mode 100644 index 0000000..388a106 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-native-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "mock_connection.h" + +#include "datatransfer.pb.h" +#include "common/util.h" +#include "reader/block_reader.h" + +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl.h> +#include <gmock/gmock.h> +#include <gtest/gtest.h> + +using namespace hdfs; + +using ::hadoop::common::TokenProto; +using ::hadoop::hdfs::BlockOpResponseProto; +using ::hadoop::hdfs::ChecksumProto; +using ::hadoop::hdfs::DataTransferEncryptorMessageProto; +using ::hadoop::hdfs::ExtendedBlockProto; +using ::hadoop::hdfs::PacketHeaderProto; +using ::hadoop::hdfs::ReadOpChecksumInfoProto; + +using ::asio::buffer; +using ::asio::error_code; +using ::asio::mutable_buffers_1; +using ::testing::Return; +using std::make_pair; +using std::string; + +namespace pb = ::google::protobuf; +namespace pbio = pb::io; + +namespace hdfs { + +class MockDNConnection : public MockConnectionBase { +public: + MockDNConnection(::asio::io_service &io_service) + : MockConnectionBase(&io_service) {} + MOCK_METHOD0(Produce, ProducerResult()); +}; +} + +static inline string ToDelimitedString(const pb::MessageLite *msg) { + string res; + res.reserve(hdfs::DelimitedPBMessageSize(msg)); + pbio::StringOutputStream os(&res); + pbio::CodedOutputStream out(&os); + out.WriteVarint32(msg->ByteSize()); + msg->SerializeToCodedStream(&out); + return res; +} + +static inline std::pair<error_code, string> Produce(const std::string &s) { + return make_pair(error_code(), s); +} + +static inline std::pair<error_code, string> +ProducePacket(const std::string &data, const std::string &checksum, + int offset_in_block, int seqno, bool last_packet) { + PacketHeaderProto proto; + proto.set_datalen(data.size()); + proto.set_offsetinblock(offset_in_block); + proto.set_seqno(seqno); + proto.set_lastpacketinblock(last_packet); + + char prefix[6]; + *reinterpret_cast<unsigned *>(prefix) = + htonl(data.size() + checksum.size() + sizeof(int)); + *reinterpret_cast<short *>(prefix + sizeof(int)) = htons(proto.ByteSize()); + std::string payload(prefix, sizeof(prefix)); + payload.reserve(payload.size() + proto.ByteSize() + checksum.size() + + data.size()); + proto.AppendToString(&payload); + payload += checksum; + payload += data; + return std::make_pair(error_code(), std::move(payload)); +} + +template <class Stream = MockDNConnection, class Handler> +static std::shared_ptr<RemoteBlockReader<Stream>> +ReadContent(Stream *conn, TokenProto *token, const ExtendedBlockProto &block, + uint64_t length, uint64_t offset, const mutable_buffers_1 &buf, + const Handler &handler) { + BlockReaderOptions options; + auto reader = std::make_shared<RemoteBlockReader<Stream>>(options, conn); + Status result; + reader->async_connect("libhdfs++", token, &block, length, offset, + [buf, reader, handler](const Status &stat) { + if (!stat.ok()) { + handler(stat, 0); + } else { + reader->async_read_some(buf, handler); + } + }); + return reader; +} + +TEST(RemoteBlockReaderTest, TestReadWholeBlock) { + static const size_t kChunkSize = 512; + static const string kChunkData(kChunkSize, 'a'); + ::asio::io_service io_service; + MockDNConnection conn(io_service); + BlockOpResponseProto block_op_resp; + + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + EXPECT_CALL(conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); + + ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + + std::string data(kChunkSize, 0); + ReadContent(&conn, nullptr, block, kChunkSize, 0, + buffer(const_cast<char *>(data.c_str()), data.size()), + [&data, &io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + io_service.stop(); + }); + io_service.run(); +} + +TEST(RemoteBlockReaderTest, TestReadWithinChunk) { + static const size_t kChunkSize = 1024; + static const size_t kLength = kChunkSize / 4 * 3; + static const size_t kOffset = kChunkSize / 4; + static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b'); + + ::asio::io_service io_service; + MockDNConnection conn(io_service); + BlockOpResponseProto block_op_resp; + ReadOpChecksumInfoProto *checksum_info = + block_op_resp.mutable_readopchecksuminfo(); + checksum_info->set_chunkoffset(0); + ChecksumProto *checksum = checksum_info->mutable_checksum(); + checksum->set_type(::hadoop::hdfs::ChecksumTypeProto::CHECKSUM_NULL); + checksum->set_bytesperchecksum(512); + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + + EXPECT_CALL(conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true))); + + ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + + string data(kLength, 0); + ReadContent(&conn, nullptr, block, data.size(), kOffset, + buffer(const_cast<char *>(data.c_str()), data.size()), + [&data, &io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kLength, transferred); + ASSERT_EQ(kChunkData.substr(kOffset, kLength), data); + io_service.stop(); + }); + io_service.run(); +} + +TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { + static const size_t kChunkSize = 1024; + static const string kChunkData(kChunkSize, 'a'); + + ::asio::io_service io_service; + MockDNConnection conn(io_service); + BlockOpResponseProto block_op_resp; + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + + EXPECT_CALL(conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false))) + .WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true))); + + ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + + string data(kChunkSize, 0); + mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size()); + BlockReaderOptions options; + auto reader = std::make_shared<RemoteBlockReader<MockDNConnection> >(options, &conn); + Status result; + reader->async_connect( + "libhdfs++", nullptr, &block, data.size(), 0, + [buf, reader, &data, &io_service](const Status &stat) { + ASSERT_TRUE(stat.ok()); + reader->async_read_some( + buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + data.clear(); + data.resize(kChunkSize); + transferred = 0; + reader->async_read_some( + buf, [&data,&io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + io_service.stop(); + }); + }); + }); + io_service.run(); +} + +TEST(RemoteBlockReaderTest, TestSaslConnection) { + static const size_t kChunkSize = 512; + static const string kChunkData(kChunkSize, 'a'); + static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/" + "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\"," + "charset=utf-8,algorithm=md5-sess"; + ::asio::io_service io_service; + MockDNConnection conn(io_service); + BlockOpResponseProto block_op_resp; + block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); + + DataTransferEncryptorMessageProto sasl_resp0, sasl_resp1; + sasl_resp0.set_status( + ::hadoop::hdfs:: + DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS); + sasl_resp0.set_payload(kAuthPayload); + sasl_resp1.set_status( + ::hadoop::hdfs:: + DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS); + + EXPECT_CALL(conn, Produce()) + .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp0)))) + .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp1)))) + .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) + .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); + + DataTransferSaslStream<MockDNConnection> sasl_conn(&conn, "foo", "bar"); + ExtendedBlockProto block; + block.set_poolid("foo"); + block.set_blockid(0); + block.set_generationstamp(0); + + std::string data(kChunkSize, 0); + sasl_conn.Handshake([&sasl_conn, &block, &data, &io_service]( + const Status &s) { + ASSERT_TRUE(s.ok()); + ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0, + buffer(const_cast<char *>(data.c_str()), data.size()), + [&data, &io_service](const Status &stat, size_t transferred) { + ASSERT_TRUE(stat.ok()); + ASSERT_EQ(kChunkSize, transferred); + ASSERT_EQ(kChunkData, data); + io_service.stop(); + }); + }); + io_service.run(); +} + +int main(int argc, char *argv[]) { + // The following line must be executed to initialize Google Mock + // (and Google Test) before running the tests. + ::testing::InitGoogleMock(&argc, argv); + return RUN_ALL_TESTS(); +}
