http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc deleted file mode 100644 index 8c4130f..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.cc +++ /dev/null @@ -1,270 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "rpc_engine.h" - -#include "RpcHeader.pb.h" -#include "ProtobufRpcEngine.pb.h" -#include "IpcConnectionContext.pb.h" - -#include "common/logging.h" -#include "common/util.h" - -#include <asio/read.hpp> - -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream_impl_lite.h> - -namespace hdfs { - -namespace pb = ::google::protobuf; -namespace pbio = ::google::protobuf::io; - -using namespace ::hadoop::common; -using namespace ::std::placeholders; - -static void -ConstructPacket(std::string *res, - std::initializer_list<const pb::MessageLite *> headers, - const std::string *request) { - int len = 0; - std::for_each( - headers.begin(), headers.end(), - [&len](const pb::MessageLite *v) { len += DelimitedPBMessageSize(v); }); - if (request) { - len += pbio::CodedOutputStream::VarintSize32(request->size()) + - request->size(); - } - - int net_len = htonl(len); - res->reserve(res->size() + sizeof(net_len) + len); - - pbio::StringOutputStream ss(res); - pbio::CodedOutputStream os(&ss); - os.WriteRaw(reinterpret_cast<const char *>(&net_len), sizeof(net_len)); - - uint8_t *buf = os.GetDirectBufferForNBytesAndAdvance(len); - - std::for_each( - headers.begin(), headers.end(), [&buf](const pb::MessageLite *v) { - buf = pbio::CodedOutputStream::WriteVarint32ToArray(v->ByteSize(), buf); - buf = v->SerializeWithCachedSizesToArray(buf); - }); - - if (request) { - buf = pbio::CodedOutputStream::WriteVarint32ToArray(request->size(), buf); - buf = os.WriteStringToArray(*request, buf); - } -} - -static void SetRequestHeader(RpcEngine *engine, int call_id, - const std::string &method_name, - RpcRequestHeaderProto *rpc_header, - RequestHeaderProto *req_header) { - rpc_header->set_rpckind(RPC_PROTOCOL_BUFFER); - rpc_header->set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); - rpc_header->set_callid(call_id); - rpc_header->set_clientid(engine->client_name()); - - req_header->set_methodname(method_name); - req_header->set_declaringclassprotocolname(engine->protocol_name()); - req_header->set_clientprotocolversion(engine->protocol_version()); -} - -RpcConnection::~RpcConnection() {} - -RpcConnection::Request::Request(RpcConnection *parent, - const std::string &method_name, - const std::string &request, Handler &&handler) - : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()), - handler_(std::move(handler)) { - RpcRequestHeaderProto rpc_header; - RequestHeaderProto req_header; - SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header, - &req_header); - ConstructPacket(&payload_, {&rpc_header, &req_header}, &request); -} - -RpcConnection::Request::Request(RpcConnection *parent, - const std::string &method_name, - const pb::MessageLite *request, - Handler &&handler) - : call_id_(parent->engine_->NextCallId()), timer_(parent->io_service()), - handler_(std::move(handler)) { - RpcRequestHeaderProto rpc_header; - RequestHeaderProto req_header; - SetRequestHeader(parent->engine_, call_id_, method_name, &rpc_header, - &req_header); - ConstructPacket(&payload_, {&rpc_header, &req_header, request}, nullptr); -} - -void RpcConnection::Request::OnResponseArrived(pbio::CodedInputStream *is, - const Status &status) { - handler_(is, status); -} - -RpcConnection::RpcConnection(RpcEngine *engine) - : engine_(engine), resp_state_(kReadLength), resp_length_(0) {} - -::asio::io_service &RpcConnection::io_service() { - return engine_->io_service(); -} - -void RpcConnection::Start() { - io_service().post(std::bind(&RpcConnection::OnRecvCompleted, this, - ::asio::error_code(), 0)); -} - -void RpcConnection::FlushPendingRequests() { - io_service().post([this]() { - if (!request_over_the_wire_) { - OnSendCompleted(::asio::error_code(), 0); - } - }); -} - -void RpcConnection::HandleRpcResponse(const std::vector<char> &data) { - /* assumed to be called from a context that has already acquired the - * engine_state_lock */ - pbio::ArrayInputStream ar(&data[0], data.size()); - pbio::CodedInputStream in(&ar); - in.PushLimit(data.size()); - RpcResponseHeaderProto h; - ReadDelimitedPBMessage(&in, &h); - - auto req = RemoveFromRunningQueue(h.callid()); - if (!req) { - LOG_WARN() << "RPC response with Unknown call id " << h.callid(); - return; - } - - Status stat; - if (h.has_exceptionclassname()) { - stat = - Status::Exception(h.exceptionclassname().c_str(), h.errormsg().c_str()); - } - req->OnResponseArrived(&in, stat); -} - -void RpcConnection::HandleRpcTimeout(std::shared_ptr<Request> req, - const ::asio::error_code &ec) { - if (ec.value() == asio::error::operation_aborted) { - return; - } - - std::lock_guard<std::mutex> state_lock(engine_state_lock_); - auto r = RemoveFromRunningQueue(req->call_id()); - if (!r) { - // The RPC might have been finished and removed from the queue - return; - } - - Status stat = ToStatus(ec ? ec : make_error_code(::asio::error::timed_out)); - - r->OnResponseArrived(nullptr, stat); -} - -std::shared_ptr<std::string> RpcConnection::PrepareHandshakePacket() { - static const char kHandshakeHeader[] = {'h', 'r', 'p', 'c', - RpcEngine::kRpcVersion, 0, 0}; - auto res = - std::make_shared<std::string>(kHandshakeHeader, sizeof(kHandshakeHeader)); - - RpcRequestHeaderProto h; - h.set_rpckind(RPC_PROTOCOL_BUFFER); - h.set_rpcop(RpcRequestHeaderProto::RPC_FINAL_PACKET); - h.set_callid(RpcEngine::kCallIdConnectionContext); - h.set_clientid(engine_->client_name()); - - IpcConnectionContextProto handshake; - handshake.set_protocol(engine_->protocol_name()); - ConstructPacket(res.get(), {&h, &handshake}, nullptr); - return res; -} - -void RpcConnection::AsyncRpc( - const std::string &method_name, const ::google::protobuf::MessageLite *req, - std::shared_ptr<::google::protobuf::MessageLite> resp, - const Callback &handler) { - std::lock_guard<std::mutex> state_lock(engine_state_lock_); - - auto wrapped_handler = - [resp, handler](pbio::CodedInputStream *is, const Status &status) { - if (status.ok()) { - ReadDelimitedPBMessage(is, resp.get()); - } - handler(status); - }; - - auto r = std::make_shared<Request>(this, method_name, req, - std::move(wrapped_handler)); - pending_requests_.push_back(r); - FlushPendingRequests(); -} - -void RpcConnection::AsyncRawRpc(const std::string &method_name, - const std::string &req, - std::shared_ptr<std::string> resp, - Callback &&handler) { - std::lock_guard<std::mutex> state_lock(engine_state_lock_); - - auto wrapped_handler = - [this, resp, handler](pbio::CodedInputStream *is, const Status &status) { - if (status.ok()) { - uint32_t size = 0; - is->ReadVarint32(&size); - auto limit = is->PushLimit(size); - is->ReadString(resp.get(), limit); - is->PopLimit(limit); - } - handler(status); - }; - - auto r = std::make_shared<Request>(this, method_name, req, - std::move(wrapped_handler)); - pending_requests_.push_back(r); - FlushPendingRequests(); -} - -void RpcConnection::ClearAndDisconnect(const ::asio::error_code &ec) { - Shutdown(); - std::vector<std::shared_ptr<Request>> requests; - std::transform(requests_on_fly_.begin(), requests_on_fly_.end(), - std::back_inserter(requests), - std::bind(&RequestOnFlyMap::value_type::second, _1)); - requests_on_fly_.clear(); - requests.insert(requests.end(), - std::make_move_iterator(pending_requests_.begin()), - std::make_move_iterator(pending_requests_.end())); - pending_requests_.clear(); - for (const auto &req : requests) { - req->OnResponseArrived(nullptr, ToStatus(ec)); - } -} - -std::shared_ptr<RpcConnection::Request> -RpcConnection::RemoveFromRunningQueue(int call_id) { - auto it = requests_on_fly_.find(call_id); - if (it == requests_on_fly_.end()) { - return std::shared_ptr<Request>(); - } - - auto req = it->second; - requests_on_fly_.erase(it); - return req; -} -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h deleted file mode 100644 index 439a730..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_connection.h +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIB_RPC_RPC_CONNECTION_H_ -#define LIB_RPC_RPC_CONNECTION_H_ - -#include "rpc_engine.h" - -#include "common/logging.h" -#include "common/util.h" - -#include <asio/connect.hpp> -#include <asio/read.hpp> -#include <asio/write.hpp> - -namespace hdfs { - -template <class NextLayer> class RpcConnectionImpl : public RpcConnection { -public: - RpcConnectionImpl(RpcEngine *engine); - virtual void Connect(const ::asio::ip::tcp::endpoint &server, - Callback &&handler) override; - virtual void Handshake(Callback &&handler) override; - virtual void Shutdown() override; - virtual void OnSendCompleted(const ::asio::error_code &ec, - size_t transferred) override; - virtual void OnRecvCompleted(const ::asio::error_code &ec, - size_t transferred) override; - - NextLayer &next_layer() { return next_layer_; } -private: - const Options options_; - NextLayer next_layer_; -}; - -template <class NextLayer> -RpcConnectionImpl<NextLayer>::RpcConnectionImpl(RpcEngine *engine) - : RpcConnection(engine), options_(engine->options()), - next_layer_(engine->io_service()) {} - -template <class NextLayer> -void RpcConnectionImpl<NextLayer>::Connect( - const ::asio::ip::tcp::endpoint &server, Callback &&handler) { - next_layer_.async_connect(server, - [handler](const ::asio::error_code &ec) { - handler(ToStatus(ec)); - }); -} - -template <class NextLayer> -void RpcConnectionImpl<NextLayer>::Handshake(Callback &&handler) { - auto handshake_packet = PrepareHandshakePacket(); - ::asio::async_write( - next_layer_, asio::buffer(*handshake_packet), - [handshake_packet, handler](const ::asio::error_code &ec, size_t) { - handler(ToStatus(ec)); - }); -} - -template <class NextLayer> -void RpcConnectionImpl<NextLayer>::OnSendCompleted(const ::asio::error_code &ec, - size_t) { - using std::placeholders::_1; - using std::placeholders::_2; - std::lock_guard<std::mutex> state_lock(engine_state_lock_); - - request_over_the_wire_.reset(); - if (ec) { - // Current RPC has failed -- abandon the - // connection and do proper clean up - ClearAndDisconnect(ec); - return; - } - - if (!pending_requests_.size()) { - return; - } - - std::shared_ptr<Request> req = pending_requests_.front(); - pending_requests_.erase(pending_requests_.begin()); - requests_on_fly_[req->call_id()] = req; - request_over_the_wire_ = req; - - req->timer().expires_from_now( - std::chrono::milliseconds(options_.rpc_timeout)); - req->timer().async_wait(std::bind( - &RpcConnectionImpl<NextLayer>::HandleRpcTimeout, this, req, _1)); - - asio::async_write( - next_layer_, asio::buffer(req->payload()), - std::bind(&RpcConnectionImpl<NextLayer>::OnSendCompleted, this, _1, _2)); -} - -template <class NextLayer> -void RpcConnectionImpl<NextLayer>::OnRecvCompleted(const ::asio::error_code &ec, - size_t) { - using std::placeholders::_1; - using std::placeholders::_2; - std::lock_guard<std::mutex> state_lock(engine_state_lock_); - - switch (ec.value()) { - case 0: - // No errors - break; - case asio::error::operation_aborted: - // The event loop has been shut down. Ignore the error. - return; - default: - LOG_WARN() << "Network error during RPC: " << ec.message(); - ClearAndDisconnect(ec); - return; - } - - if (resp_state_ == kReadLength) { - resp_state_ = kReadContent; - auto buf = ::asio::buffer(reinterpret_cast<char *>(&resp_length_), - sizeof(resp_length_)); - asio::async_read(next_layer_, buf, - std::bind(&RpcConnectionImpl<NextLayer>::OnRecvCompleted, - this, _1, _2)); - - } else if (resp_state_ == kReadContent) { - resp_state_ = kParseResponse; - resp_length_ = ntohl(resp_length_); - resp_data_.resize(resp_length_); - asio::async_read(next_layer_, ::asio::buffer(resp_data_), - std::bind(&RpcConnectionImpl<NextLayer>::OnRecvCompleted, - this, _1, _2)); - - } else if (resp_state_ == kParseResponse) { - resp_state_ = kReadLength; - HandleRpcResponse(resp_data_); - resp_data_.clear(); - Start(); - } -} - -template <class NextLayer> void RpcConnectionImpl<NextLayer>::Shutdown() { - next_layer_.cancel(); - next_layer_.close(); -} -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc deleted file mode 100644 index 83721a7..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.cc +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "rpc_engine.h" -#include "rpc_connection.h" -#include "common/util.h" - -#include <openssl/rand.h> - -#include <sstream> -#include <future> - -namespace hdfs { - -RpcEngine::RpcEngine(::asio::io_service *io_service, const Options &options, - const std::string &client_name, const char *protocol_name, - int protocol_version) - : io_service_(io_service), options_(options), client_name_(client_name), - protocol_name_(protocol_name), protocol_version_(protocol_version), - call_id_(0) { -} - -void RpcEngine::Connect(const ::asio::ip::tcp::endpoint &server, - const std::function<void(const Status &)> &handler) { - conn_.reset(new RpcConnectionImpl<::asio::ip::tcp::socket>(this)); - conn_->Connect(server, [this, handler](const Status &stat) { - if (!stat.ok()) { - handler(stat); - } else { - conn_->Handshake([handler](const Status &s) { handler(s); }); - } - }); -} - -void RpcEngine::Start() { conn_->Start(); } - -void RpcEngine::Shutdown() { - io_service_->post([this]() { conn_->Shutdown(); }); -} - -void RpcEngine::TEST_SetRpcConnection(std::unique_ptr<RpcConnection> *conn) { - conn_.reset(conn->release()); -} - -void RpcEngine::AsyncRpc( - const std::string &method_name, const ::google::protobuf::MessageLite *req, - const std::shared_ptr<::google::protobuf::MessageLite> &resp, - const std::function<void(const Status &)> &handler) { - conn_->AsyncRpc(method_name, req, resp, handler); -} - -Status -RpcEngine::Rpc(const std::string &method_name, - const ::google::protobuf::MessageLite *req, - const std::shared_ptr<::google::protobuf::MessageLite> &resp) { - auto stat = std::make_shared<std::promise<Status>>(); - std::future<Status> future(stat->get_future()); - AsyncRpc(method_name, req, resp, - [stat](const Status &status) { stat->set_value(status); }); - return future.get(); -} - -Status RpcEngine::RawRpc(const std::string &method_name, const std::string &req, - std::shared_ptr<std::string> resp) { - auto stat = std::make_shared<std::promise<Status>>(); - std::future<Status> future(stat->get_future()); - conn_->AsyncRawRpc(method_name, req, resp, - [stat](const Status &status) { stat->set_value(status); }); - return future.get(); -} - -std::string RpcEngine::GetRandomClientName() { - unsigned char buf[6] = { - 0, - }; - RAND_pseudo_bytes(buf, sizeof(buf)); - - std::stringstream ss; - ss << "libhdfs++_" - << Base64Encode(std::string(reinterpret_cast<char *>(buf), sizeof(buf))); - return ss.str(); -} -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h deleted file mode 100644 index ee04fd5..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/lib/rpc/rpc_engine.h +++ /dev/null @@ -1,172 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIB_RPC_RPC_ENGINE_H_ -#define LIB_RPC_RPC_ENGINE_H_ - -#include "libhdfspp/options.h" -#include "libhdfspp/status.h" - -#include <google/protobuf/message_lite.h> - -#include <asio/ip/tcp.hpp> -#include <asio/deadline_timer.hpp> - -#include <atomic> -#include <memory> -#include <unordered_map> -#include <vector> -#include <mutex> - -namespace hdfs { - -class RpcEngine; -class RpcConnection { -public: - typedef std::function<void(const Status &)> Callback; - virtual ~RpcConnection(); - RpcConnection(RpcEngine *engine); - virtual void Connect(const ::asio::ip::tcp::endpoint &server, - Callback &&handler) = 0; - virtual void Handshake(Callback &&handler) = 0; - virtual void Shutdown() = 0; - - void Start(); - void AsyncRpc(const std::string &method_name, - const ::google::protobuf::MessageLite *req, - std::shared_ptr<::google::protobuf::MessageLite> resp, - const Callback &handler); - - void AsyncRawRpc(const std::string &method_name, const std::string &request, - std::shared_ptr<std::string> resp, Callback &&handler); - -protected: - class Request; - RpcEngine *const engine_; - virtual void OnSendCompleted(const ::asio::error_code &ec, - size_t transferred) = 0; - virtual void OnRecvCompleted(const ::asio::error_code &ec, - size_t transferred) = 0; - - ::asio::io_service &io_service(); - std::shared_ptr<std::string> PrepareHandshakePacket(); - static std::string - SerializeRpcRequest(const std::string &method_name, - const ::google::protobuf::MessageLite *req); - void HandleRpcResponse(const std::vector<char> &data); - void HandleRpcTimeout(std::shared_ptr<Request> req, - const ::asio::error_code &ec); - void FlushPendingRequests(); - void ClearAndDisconnect(const ::asio::error_code &ec); - std::shared_ptr<Request> RemoveFromRunningQueue(int call_id); - - enum ResponseState { - kReadLength, - kReadContent, - kParseResponse, - } resp_state_; - unsigned resp_length_; - std::vector<char> resp_data_; - - class Request { - public: - typedef std::function<void(::google::protobuf::io::CodedInputStream *is, - const Status &status)> Handler; - Request(RpcConnection *parent, const std::string &method_name, - const std::string &request, Handler &&callback); - Request(RpcConnection *parent, const std::string &method_name, - const ::google::protobuf::MessageLite *request, Handler &&callback); - - int call_id() const { return call_id_; } - ::asio::deadline_timer &timer() { return timer_; } - const std::string &payload() const { return payload_; } - void OnResponseArrived(::google::protobuf::io::CodedInputStream *is, - const Status &status); - - private: - const int call_id_; - ::asio::deadline_timer timer_; - std::string payload_; - Handler handler_; - }; - - // The request being sent over the wire - std::shared_ptr<Request> request_over_the_wire_; - // Requests to be sent over the wire - std::vector<std::shared_ptr<Request>> pending_requests_; - // Requests that are waiting for responses - typedef std::unordered_map<int, std::shared_ptr<Request>> RequestOnFlyMap; - RequestOnFlyMap requests_on_fly_; - // Lock for mutable parts of this class that need to be thread safe - std::mutex engine_state_lock_; -}; - -class RpcEngine { -public: - enum { kRpcVersion = 9 }; - enum { - kCallIdAuthorizationFailed = -1, - kCallIdInvalid = -2, - kCallIdConnectionContext = -3, - kCallIdPing = -4 - }; - - RpcEngine(::asio::io_service *io_service, const Options &options, - const std::string &client_name, const char *protocol_name, - int protocol_version); - - void AsyncRpc(const std::string &method_name, - const ::google::protobuf::MessageLite *req, - const std::shared_ptr<::google::protobuf::MessageLite> &resp, - const std::function<void(const Status &)> &handler); - - Status Rpc(const std::string &method_name, - const ::google::protobuf::MessageLite *req, - const std::shared_ptr<::google::protobuf::MessageLite> &resp); - /** - * Send raw bytes as RPC payload. This is intended to be used in JNI - * bindings only. - **/ - Status RawRpc(const std::string &method_name, const std::string &req, - std::shared_ptr<std::string> resp); - void Connect(const ::asio::ip::tcp::endpoint &server, - const std::function<void(const Status &)> &handler); - void Start(); - void Shutdown(); - void TEST_SetRpcConnection(std::unique_ptr<RpcConnection> *conn); - - int NextCallId() { return ++call_id_; } - - const std::string &client_name() const { return client_name_; } - const std::string &protocol_name() const { return protocol_name_; } - int protocol_version() const { return protocol_version_; } - ::asio::io_service &io_service() { return *io_service_; } - const Options &options() { return options_; } - static std::string GetRandomClientName(); - -private: - ::asio::io_service *io_service_; - Options options_; - const std::string client_name_; - const std::string protocol_name_; - const int protocol_version_; - std::atomic_int call_id_; - std::unique_ptr<RpcConnection> conn_; -}; -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt deleted file mode 100644 index eca878e..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/CMakeLists.txt +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -add_library(test_common OBJECT mock_connection.cc) - -set(PROTOBUF_IMPORT_DIRS ${PROTO_HADOOP_TEST_DIR}) - -protobuf_generate_cpp(PROTO_TEST_SRCS PROTO_TEST_HDRS - ${PROTO_HADOOP_TEST_DIR}/test.proto - ${PROTO_HADOOP_TEST_DIR}/test_rpc_service.proto -) - -add_executable(remote_block_reader_test remote_block_reader_test.cc $<TARGET_OBJECTS:test_common>) -target_link_libraries(remote_block_reader_test reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) -add_test(remote_block_reader remote_block_reader_test) - -add_executable(sasl_digest_md5_test sasl_digest_md5_test.cc) -target_link_libraries(sasl_digest_md5_test common ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) -add_test(sasl_digest_md5 sasl_digest_md5_test) - -add_executable(inputstream_test inputstream_test.cc) -target_link_libraries(inputstream_test fs rpc reader proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) -add_test(inputstream inputstream_test) - -include_directories(${CMAKE_CURRENT_BINARY_DIR}) -add_executable(rpc_engine_test rpc_engine_test.cc ${PROTO_TEST_SRCS} ${PROTO_TEST_HDRS} $<TARGET_OBJECTS:test_common>) -target_link_libraries(rpc_engine_test rpc proto common ${PROTOBUF_LIBRARIES} ${OPENSSL_LIBRARIES} gmock_main ${CMAKE_THREAD_LIBS_INIT}) -add_test(rpc_engine rpc_engine_test) http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc deleted file mode 100644 index aa95256..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/inputstream_test.cc +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "fs/filesystem.h" -#include <gmock/gmock.h> - -using hadoop::common::TokenProto; -using hadoop::hdfs::DatanodeInfoProto; -using hadoop::hdfs::DatanodeIDProto; -using hadoop::hdfs::ExtendedBlockProto; -using hadoop::hdfs::LocatedBlockProto; -using hadoop::hdfs::LocatedBlocksProto; - -using ::testing::_; -using ::testing::InvokeArgument; -using ::testing::Return; - -using namespace hdfs; - -namespace hdfs { - -class MockReader { -public: - virtual ~MockReader() {} - MOCK_METHOD2( - async_read_some, - void(const asio::mutable_buffers_1 &, - const std::function<void(const Status &, size_t transferred)> &)); - - MOCK_METHOD6(async_connect, - void(const std::string &, TokenProto *, ExtendedBlockProto *, - uint64_t, uint64_t, - const std::function<void(const Status &)> &)); -}; - -template <class Trait> struct MockBlockReaderTrait { - typedef MockReader Reader; - struct State { - MockReader reader_; - size_t transferred_; - Reader *reader() { return &reader_; } - size_t *transferred() { return &transferred_; } - const size_t *transferred() const { return &transferred_; } - }; - - static continuation::Pipeline<State> * - CreatePipeline(::asio::io_service *, const DatanodeInfoProto &) { - auto m = continuation::Pipeline<State>::Create(); - *m->state().transferred() = 0; - Trait::InitializeMockReader(m->state().reader()); - return m; - } -}; -} - -TEST(InputStreamTest, TestReadSingleTrunk) { - LocatedBlocksProto blocks; - LocatedBlockProto block; - DatanodeInfoProto dn; - char buf[4096] = { - 0, - }; - IoServiceImpl io_service; - Options options; - FileSystemImpl fs(&io_service, options); - InputStreamImpl is(&fs, &blocks); - Status stat; - size_t read = 0; - struct Trait { - static void InitializeMockReader(MockReader *reader) { - EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) - .WillOnce(InvokeArgument<5>(Status::OK())); - - EXPECT_CALL(*reader, async_read_some(_, _)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); - } - }; - - is.AsyncReadBlock<MockBlockReaderTrait<Trait>>( - "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), - [&stat, &read](const Status &status, const std::string &, size_t transferred) { - stat = status; - read = transferred; - }); - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(sizeof(buf), read); - read = 0; -} - -TEST(InputStreamTest, TestReadMultipleTrunk) { - LocatedBlocksProto blocks; - LocatedBlockProto block; - DatanodeInfoProto dn; - char buf[4096] = { - 0, - }; - IoServiceImpl io_service; - Options options; - FileSystemImpl fs(&io_service, options); - InputStreamImpl is(&fs, &blocks); - Status stat; - size_t read = 0; - struct Trait { - static void InitializeMockReader(MockReader *reader) { - EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) - .WillOnce(InvokeArgument<5>(Status::OK())); - - EXPECT_CALL(*reader, async_read_some(_, _)) - .Times(4) - .WillRepeatedly(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)); - } - }; - - is.AsyncReadBlock<MockBlockReaderTrait<Trait>>( - "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), - [&stat, &read](const Status &status, const std::string &, - size_t transferred) { - stat = status; - read = transferred; - }); - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(sizeof(buf), read); - read = 0; -} - -TEST(InputStreamTest, TestReadError) { - LocatedBlocksProto blocks; - LocatedBlockProto block; - DatanodeInfoProto dn; - char buf[4096] = { - 0, - }; - IoServiceImpl io_service; - Options options; - FileSystemImpl fs(&io_service, options); - InputStreamImpl is(&fs, &blocks); - Status stat; - size_t read = 0; - struct Trait { - static void InitializeMockReader(MockReader *reader) { - EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) - .WillOnce(InvokeArgument<5>(Status::OK())); - - EXPECT_CALL(*reader, async_read_some(_, _)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf) / 4)) - .WillOnce(InvokeArgument<1>(Status::Error("error"), 0)); - } - }; - - is.AsyncReadBlock<MockBlockReaderTrait<Trait>>( - "client", block, dn, 0, asio::buffer(buf, sizeof(buf)), - [&stat, &read](const Status &status, const std::string &, - size_t transferred) { - stat = status; - read = transferred; - }); - ASSERT_FALSE(stat.ok()); - ASSERT_EQ(sizeof(buf) / 4 * 3, read); - read = 0; -} - -TEST(InputStreamTest, TestExcludeDataNode) { - LocatedBlocksProto blocks; - LocatedBlockProto *block = blocks.add_blocks(); - ExtendedBlockProto *b = block->mutable_b(); - b->set_poolid(""); - b->set_blockid(1); - b->set_generationstamp(1); - b->set_numbytes(4096); - - DatanodeInfoProto *di = block->add_locs(); - DatanodeIDProto *dnid = di->mutable_id(); - dnid->set_datanodeuuid("foo"); - - char buf[4096] = { - 0, - }; - IoServiceImpl io_service; - Options options; - FileSystemImpl fs(&io_service, options); - InputStreamImpl is(&fs, &blocks); - Status stat; - size_t read = 0; - struct Trait { - static void InitializeMockReader(MockReader *reader) { - EXPECT_CALL(*reader, async_connect(_, _, _, _, _, _)) - .WillOnce(InvokeArgument<5>(Status::OK())); - - EXPECT_CALL(*reader, async_read_some(_, _)) - .WillOnce(InvokeArgument<1>(Status::OK(), sizeof(buf))); - } - }; - - - std::set<std::string> excluded_dn({"foo"}); - is.AsyncPreadSome(0, asio::buffer(buf, sizeof(buf)), excluded_dn, - [&stat, &read](const Status &status, const std::string &, size_t transferred) { - stat = status; - read = transferred; - }); - ASSERT_EQ(static_cast<int>(std::errc::resource_unavailable_try_again), stat.code()); - ASSERT_EQ(0UL, read); -} - -int main(int argc, char *argv[]) { - // The following line must be executed to initialize Google Mock - // (and Google Test) before running the tests. - ::testing::InitGoogleMock(&argc, argv); - return RUN_ALL_TESTS(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc deleted file mode 100644 index 93a3099..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.cc +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "mock_connection.h" - -namespace hdfs { - -MockConnectionBase::MockConnectionBase(::asio::io_service *io_service) - : io_service_(io_service) -{} - -MockConnectionBase::~MockConnectionBase() {} - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h deleted file mode 100644 index 8c0ef8c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/mock_connection.h +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef LIBHDFSPP_TEST_MOCK_CONNECTION_H_ -#define LIBHDFSPP_TEST_MOCK_CONNECTION_H_ - -#include <asio/error_code.hpp> -#include <asio/buffer.hpp> -#include <asio/streambuf.hpp> -#include <asio/io_service.hpp> - -#include <gmock/gmock.h> - -namespace hdfs { - -class MockConnectionBase { -public: - MockConnectionBase(::asio::io_service *io_service); - virtual ~MockConnectionBase(); - typedef std::pair<asio::error_code, std::string> ProducerResult; - template <class MutableBufferSequence, class Handler> - void async_read_some(const MutableBufferSequence &buf, Handler &&handler) { - if (produced_.size() == 0) { - ProducerResult r = Produce(); - if (r.first) { - io_service_->post(std::bind(handler, r.first, 0)); - return; - } - asio::mutable_buffers_1 data = produced_.prepare(r.second.size()); - asio::buffer_copy(data, asio::buffer(r.second)); - produced_.commit(r.second.size()); - } - - size_t len = std::min(asio::buffer_size(buf), produced_.size()); - asio::buffer_copy(buf, produced_.data()); - produced_.consume(len); - io_service_->post(std::bind(handler, asio::error_code(), len)); - } - - template <class ConstBufferSequence, class Handler> - void async_write_some(const ConstBufferSequence &buf, Handler &&handler) { - // CompletionResult res = OnWrite(buf); - io_service_->post(std::bind(handler, asio::error_code(), asio::buffer_size(buf))); - } - -protected: - virtual ProducerResult Produce() = 0; - ::asio::io_service *io_service_; - -private: - asio::streambuf produced_; -}; -} - -#endif http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc deleted file mode 100644 index 388a106..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/remote_block_reader_test.cc +++ /dev/null @@ -1,283 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "mock_connection.h" - -#include "datatransfer.pb.h" -#include "common/util.h" -#include "reader/block_reader.h" - -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream_impl.h> -#include <gmock/gmock.h> -#include <gtest/gtest.h> - -using namespace hdfs; - -using ::hadoop::common::TokenProto; -using ::hadoop::hdfs::BlockOpResponseProto; -using ::hadoop::hdfs::ChecksumProto; -using ::hadoop::hdfs::DataTransferEncryptorMessageProto; -using ::hadoop::hdfs::ExtendedBlockProto; -using ::hadoop::hdfs::PacketHeaderProto; -using ::hadoop::hdfs::ReadOpChecksumInfoProto; - -using ::asio::buffer; -using ::asio::error_code; -using ::asio::mutable_buffers_1; -using ::testing::Return; -using std::make_pair; -using std::string; - -namespace pb = ::google::protobuf; -namespace pbio = pb::io; - -namespace hdfs { - -class MockDNConnection : public MockConnectionBase { -public: - MockDNConnection(::asio::io_service &io_service) - : MockConnectionBase(&io_service) {} - MOCK_METHOD0(Produce, ProducerResult()); -}; -} - -static inline string ToDelimitedString(const pb::MessageLite *msg) { - string res; - res.reserve(hdfs::DelimitedPBMessageSize(msg)); - pbio::StringOutputStream os(&res); - pbio::CodedOutputStream out(&os); - out.WriteVarint32(msg->ByteSize()); - msg->SerializeToCodedStream(&out); - return res; -} - -static inline std::pair<error_code, string> Produce(const std::string &s) { - return make_pair(error_code(), s); -} - -static inline std::pair<error_code, string> -ProducePacket(const std::string &data, const std::string &checksum, - int offset_in_block, int seqno, bool last_packet) { - PacketHeaderProto proto; - proto.set_datalen(data.size()); - proto.set_offsetinblock(offset_in_block); - proto.set_seqno(seqno); - proto.set_lastpacketinblock(last_packet); - - char prefix[6]; - *reinterpret_cast<unsigned *>(prefix) = - htonl(data.size() + checksum.size() + sizeof(int)); - *reinterpret_cast<short *>(prefix + sizeof(int)) = htons(proto.ByteSize()); - std::string payload(prefix, sizeof(prefix)); - payload.reserve(payload.size() + proto.ByteSize() + checksum.size() + - data.size()); - proto.AppendToString(&payload); - payload += checksum; - payload += data; - return std::make_pair(error_code(), std::move(payload)); -} - -template <class Stream = MockDNConnection, class Handler> -static std::shared_ptr<RemoteBlockReader<Stream>> -ReadContent(Stream *conn, TokenProto *token, const ExtendedBlockProto &block, - uint64_t length, uint64_t offset, const mutable_buffers_1 &buf, - const Handler &handler) { - BlockReaderOptions options; - auto reader = std::make_shared<RemoteBlockReader<Stream>>(options, conn); - Status result; - reader->async_connect("libhdfs++", token, &block, length, offset, - [buf, reader, handler](const Status &stat) { - if (!stat.ok()) { - handler(stat, 0); - } else { - reader->async_read_some(buf, handler); - } - }); - return reader; -} - -TEST(RemoteBlockReaderTest, TestReadWholeBlock) { - static const size_t kChunkSize = 512; - static const string kChunkData(kChunkSize, 'a'); - ::asio::io_service io_service; - MockDNConnection conn(io_service); - BlockOpResponseProto block_op_resp; - - block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); - EXPECT_CALL(conn, Produce()) - .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) - .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); - - ExtendedBlockProto block; - block.set_poolid("foo"); - block.set_blockid(0); - block.set_generationstamp(0); - - std::string data(kChunkSize, 0); - ReadContent(&conn, nullptr, block, kChunkSize, 0, - buffer(const_cast<char *>(data.c_str()), data.size()), - [&data, &io_service](const Status &stat, size_t transferred) { - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(kChunkSize, transferred); - ASSERT_EQ(kChunkData, data); - io_service.stop(); - }); - io_service.run(); -} - -TEST(RemoteBlockReaderTest, TestReadWithinChunk) { - static const size_t kChunkSize = 1024; - static const size_t kLength = kChunkSize / 4 * 3; - static const size_t kOffset = kChunkSize / 4; - static const string kChunkData = string(kOffset, 'a') + string(kLength, 'b'); - - ::asio::io_service io_service; - MockDNConnection conn(io_service); - BlockOpResponseProto block_op_resp; - ReadOpChecksumInfoProto *checksum_info = - block_op_resp.mutable_readopchecksuminfo(); - checksum_info->set_chunkoffset(0); - ChecksumProto *checksum = checksum_info->mutable_checksum(); - checksum->set_type(::hadoop::hdfs::ChecksumTypeProto::CHECKSUM_NULL); - checksum->set_bytesperchecksum(512); - block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); - - EXPECT_CALL(conn, Produce()) - .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) - .WillOnce(Return(ProducePacket(kChunkData, "", kOffset, 1, true))); - - ExtendedBlockProto block; - block.set_poolid("foo"); - block.set_blockid(0); - block.set_generationstamp(0); - - string data(kLength, 0); - ReadContent(&conn, nullptr, block, data.size(), kOffset, - buffer(const_cast<char *>(data.c_str()), data.size()), - [&data, &io_service](const Status &stat, size_t transferred) { - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(kLength, transferred); - ASSERT_EQ(kChunkData.substr(kOffset, kLength), data); - io_service.stop(); - }); - io_service.run(); -} - -TEST(RemoteBlockReaderTest, TestReadMultiplePacket) { - static const size_t kChunkSize = 1024; - static const string kChunkData(kChunkSize, 'a'); - - ::asio::io_service io_service; - MockDNConnection conn(io_service); - BlockOpResponseProto block_op_resp; - block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); - - EXPECT_CALL(conn, Produce()) - .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) - .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, false))) - .WillOnce(Return(ProducePacket(kChunkData, "", kChunkSize, 2, true))); - - ExtendedBlockProto block; - block.set_poolid("foo"); - block.set_blockid(0); - block.set_generationstamp(0); - - string data(kChunkSize, 0); - mutable_buffers_1 buf = buffer(const_cast<char *>(data.c_str()), data.size()); - BlockReaderOptions options; - auto reader = std::make_shared<RemoteBlockReader<MockDNConnection> >(options, &conn); - Status result; - reader->async_connect( - "libhdfs++", nullptr, &block, data.size(), 0, - [buf, reader, &data, &io_service](const Status &stat) { - ASSERT_TRUE(stat.ok()); - reader->async_read_some( - buf, [buf, reader, &data, &io_service](const Status &stat, size_t transferred) { - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(kChunkSize, transferred); - ASSERT_EQ(kChunkData, data); - data.clear(); - data.resize(kChunkSize); - transferred = 0; - reader->async_read_some( - buf, [&data,&io_service](const Status &stat, size_t transferred) { - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(kChunkSize, transferred); - ASSERT_EQ(kChunkData, data); - io_service.stop(); - }); - }); - }); - io_service.run(); -} - -TEST(RemoteBlockReaderTest, TestSaslConnection) { - static const size_t kChunkSize = 512; - static const string kChunkData(kChunkSize, 'a'); - static const string kAuthPayload = "realm=\"0\",nonce=\"+GAWc+O6yEAWpew/" - "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\"," - "charset=utf-8,algorithm=md5-sess"; - ::asio::io_service io_service; - MockDNConnection conn(io_service); - BlockOpResponseProto block_op_resp; - block_op_resp.set_status(::hadoop::hdfs::Status::SUCCESS); - - DataTransferEncryptorMessageProto sasl_resp0, sasl_resp1; - sasl_resp0.set_status( - ::hadoop::hdfs:: - DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS); - sasl_resp0.set_payload(kAuthPayload); - sasl_resp1.set_status( - ::hadoop::hdfs:: - DataTransferEncryptorMessageProto_DataTransferEncryptorStatus_SUCCESS); - - EXPECT_CALL(conn, Produce()) - .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp0)))) - .WillOnce(Return(Produce(ToDelimitedString(&sasl_resp1)))) - .WillOnce(Return(Produce(ToDelimitedString(&block_op_resp)))) - .WillOnce(Return(ProducePacket(kChunkData, "", 0, 1, true))); - - DataTransferSaslStream<MockDNConnection> sasl_conn(&conn, "foo", "bar"); - ExtendedBlockProto block; - block.set_poolid("foo"); - block.set_blockid(0); - block.set_generationstamp(0); - - std::string data(kChunkSize, 0); - sasl_conn.Handshake([&sasl_conn, &block, &data, &io_service]( - const Status &s) { - ASSERT_TRUE(s.ok()); - ReadContent(&sasl_conn, nullptr, block, kChunkSize, 0, - buffer(const_cast<char *>(data.c_str()), data.size()), - [&data, &io_service](const Status &stat, size_t transferred) { - ASSERT_TRUE(stat.ok()); - ASSERT_EQ(kChunkSize, transferred); - ASSERT_EQ(kChunkData, data); - io_service.stop(); - }); - }); - io_service.run(); -} - -int main(int argc, char *argv[]) { - // The following line must be executed to initialize Google Mock - // (and Google Test) before running the tests. - ::testing::InitGoogleMock(&argc, argv); - return RUN_ALL_TESTS(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc deleted file mode 100644 index 8bce1b9..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/rpc_engine_test.cc +++ /dev/null @@ -1,179 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include "mock_connection.h" -#include "test.pb.h" -#include "RpcHeader.pb.h" -#include "rpc/rpc_connection.h" - -#include <google/protobuf/io/coded_stream.h> - -#include <gmock/gmock.h> - -using ::hadoop::common::RpcResponseHeaderProto; -using ::hadoop::common::EmptyRequestProto; -using ::hadoop::common::EmptyResponseProto; -using ::hadoop::common::EchoRequestProto; -using ::hadoop::common::EchoResponseProto; - -using ::asio::error_code; - -using ::testing::Return; - -using ::std::make_pair; -using ::std::string; - -namespace pb = ::google::protobuf; -namespace pbio = ::google::protobuf::io; - -namespace hdfs { - -class MockRPCConnection : public MockConnectionBase { -public: - MockRPCConnection(::asio::io_service &io_service) - : MockConnectionBase(&io_service) {} - MOCK_METHOD0(Produce, ProducerResult()); - template <class Endpoint, class Callback> - void async_connect(const Endpoint &, Callback &&handler) { - handler(::asio::error_code()); - } - void cancel() {} - void close() {} -}; - -static inline std::pair<error_code, string> -RpcResponse(const RpcResponseHeaderProto &h, const std::string &data, - const ::asio::error_code &ec = error_code()) { - uint32_t payload_length = - pbio::CodedOutputStream::VarintSize32(h.ByteSize()) + - pbio::CodedOutputStream::VarintSize32(data.size()) + h.ByteSize() + - data.size(); - - std::string res; - res.resize(sizeof(uint32_t) + payload_length); - uint8_t *buf = reinterpret_cast<uint8_t *>(const_cast<char *>(res.c_str())); - - buf = pbio::CodedOutputStream::WriteLittleEndian32ToArray( - htonl(payload_length), buf); - buf = pbio::CodedOutputStream::WriteVarint32ToArray(h.ByteSize(), buf); - buf = h.SerializeWithCachedSizesToArray(buf); - buf = pbio::CodedOutputStream::WriteVarint32ToArray(data.size(), buf); - buf = pbio::CodedOutputStream::WriteStringToArray(data, buf); - - return std::make_pair(ec, std::move(res)); -} -} - -using namespace hdfs; - -TEST(RpcEngineTest, TestRoundTrip) { - ::asio::io_service io_service; - Options options; - RpcEngine engine(&io_service, options, "foo", "protocol", 1); - RpcConnectionImpl<MockRPCConnection> *conn = - new RpcConnectionImpl<MockRPCConnection>(&engine); - EchoResponseProto server_resp; - server_resp.set_message("foo"); - - RpcResponseHeaderProto h; - h.set_callid(1); - h.set_status(RpcResponseHeaderProto::SUCCESS); - EXPECT_CALL(conn->next_layer(), Produce()) - .WillOnce(Return(RpcResponse(h, server_resp.SerializeAsString()))); - - std::unique_ptr<RpcConnection> conn_ptr(conn); - engine.TEST_SetRpcConnection(&conn_ptr); - - EchoRequestProto req; - req.set_message("foo"); - std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); - engine.AsyncRpc("test", &req, resp, [resp, &io_service](const Status &stat) { - ASSERT_TRUE(stat.ok()); - ASSERT_EQ("foo", resp->message()); - io_service.stop(); - }); - conn->Start(); - io_service.run(); -} - -TEST(RpcEngineTest, TestConnectionReset) { - ::asio::io_service io_service; - Options options; - RpcEngine engine(&io_service, options, "foo", "protocol", 1); - RpcConnectionImpl<MockRPCConnection> *conn = - new RpcConnectionImpl<MockRPCConnection>(&engine); - - RpcResponseHeaderProto h; - h.set_callid(1); - h.set_status(RpcResponseHeaderProto::SUCCESS); - EXPECT_CALL(conn->next_layer(), Produce()) - .WillOnce(Return(RpcResponse( - h, "", make_error_code(::asio::error::connection_reset)))); - - std::unique_ptr<RpcConnection> conn_ptr(conn); - engine.TEST_SetRpcConnection(&conn_ptr); - - EchoRequestProto req; - req.set_message("foo"); - std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); - - engine.AsyncRpc("test", &req, resp, [&io_service](const Status &stat) { - ASSERT_FALSE(stat.ok()); - }); - - engine.AsyncRpc("test", &req, resp, [&io_service](const Status &stat) { - io_service.stop(); - ASSERT_FALSE(stat.ok()); - }); - conn->Start(); - io_service.run(); -} - -TEST(RpcEngineTest, TestTimeout) { - ::asio::io_service io_service; - Options options; - options.rpc_timeout = 1; - RpcEngine engine(&io_service, options, "foo", "protocol", 1); - RpcConnectionImpl<MockRPCConnection> *conn = - new RpcConnectionImpl<MockRPCConnection>(&engine); - - EXPECT_CALL(conn->next_layer(), Produce()).Times(0); - - std::unique_ptr<RpcConnection> conn_ptr(conn); - engine.TEST_SetRpcConnection(&conn_ptr); - - EchoRequestProto req; - req.set_message("foo"); - std::shared_ptr<EchoResponseProto> resp(new EchoResponseProto()); - engine.AsyncRpc("test", &req, resp, [resp, &io_service](const Status &stat) { - io_service.stop(); - ASSERT_FALSE(stat.ok()); - }); - - ::asio::deadline_timer timer(io_service); - timer.expires_from_now(std::chrono::milliseconds(options.rpc_timeout * 2)); - timer.async_wait(std::bind(&RpcConnection::Start, conn)); - io_service.run(); -} - -int main(int argc, char *argv[]) { - // The following line must be executed to initialize Google Mock - // (and Google Test) before running the tests. - ::testing::InitGoogleMock(&argc, argv); - return RUN_ALL_TESTS(); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc deleted file mode 100644 index 0797853..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/tests/sasl_digest_md5_test.cc +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "common/sasl_authenticator.h" - -#include <gtest/gtest.h> - -namespace hdfs { - -/** - * Testing whether the authenticator generates the MD5 digest correctly. - **/ -TEST(DigestMD5AuthenticatorTest, TestResponse) { - const std::string username = "igFLnEx4OIx5PZWHAAAABGhtYWkAAAAoQlAtMTM3MDQ2OTk" - "zLTE5Mi4xNjguMS4yMjctMTQyNDIyMDM4MTM2M4xAAAABAQ" - "RSRUFE"; - const std::string password = "K5IFUibAynVVrApeCXLrBk9Sro8="; - DigestMD5Authenticator auth(username, password, true); - auth.cnonce_ = "KQlJwBDTseCHpAkFLZls4WcAktp6r5wTzje5feLY"; - std::string result; - Status status = - auth.EvaluateResponse("realm=\"0\",nonce=\"+GAWc+O6yEAWpew/" - "qKah8qh4QZLoOLCDcTtEKhlS\",qop=\"auth\",charset=" - "utf-8,algorithm=md5-sess", - &result); - ASSERT_TRUE(status.ok()); - ASSERT_TRUE(result.find("response=3a286c2c385b92a06ebc66d58b8c4330") != - std::string::npos); -} -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/third_party/asio-1.10.2/COPYING ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/third_party/asio-1.10.2/COPYING b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/third_party/asio-1.10.2/COPYING deleted file mode 100644 index e86a381..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/third_party/asio-1.10.2/COPYING +++ /dev/null @@ -1,4 +0,0 @@ -Copyright (c) 2003-2014 Christopher M. Kohlhoff (chris at kohlhoff dot com) - -Distributed under the Boost Software License, Version 1.0. (See accompanying -file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/third_party/asio-1.10.2/include/asio.hpp ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/third_party/asio-1.10.2/include/asio.hpp b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/third_party/asio-1.10.2/include/asio.hpp deleted file mode 100644 index 1f47840..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/third_party/asio-1.10.2/include/asio.hpp +++ /dev/null @@ -1,122 +0,0 @@ -// -// asio.hpp -// ~~~~~~~~ -// -// Copyright (c) 2003-2014 Christopher M. Kohlhoff (chris at kohlhoff dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// - -#ifndef ASIO_HPP -#define ASIO_HPP - -#if defined(_MSC_VER) && (_MSC_VER >= 1200) -# pragma once -#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) - -#include "asio/async_result.hpp" -#include "asio/basic_datagram_socket.hpp" -#include "asio/basic_deadline_timer.hpp" -#include "asio/basic_io_object.hpp" -#include "asio/basic_raw_socket.hpp" -#include "asio/basic_seq_packet_socket.hpp" -#include "asio/basic_serial_port.hpp" -#include "asio/basic_signal_set.hpp" -#include "asio/basic_socket_acceptor.hpp" -#include "asio/basic_socket_iostream.hpp" -#include "asio/basic_socket_streambuf.hpp" -#include "asio/basic_stream_socket.hpp" -#include "asio/basic_streambuf.hpp" -#include "asio/basic_waitable_timer.hpp" -#include "asio/buffer.hpp" -#include "asio/buffered_read_stream_fwd.hpp" -#include "asio/buffered_read_stream.hpp" -#include "asio/buffered_stream_fwd.hpp" -#include "asio/buffered_stream.hpp" -#include "asio/buffered_write_stream_fwd.hpp" -#include "asio/buffered_write_stream.hpp" -#include "asio/buffers_iterator.hpp" -#include "asio/completion_condition.hpp" -#include "asio/connect.hpp" -#include "asio/coroutine.hpp" -#include "asio/datagram_socket_service.hpp" -#include "asio/deadline_timer_service.hpp" -#include "asio/deadline_timer.hpp" -#include "asio/error.hpp" -#include "asio/error_code.hpp" -#include "asio/generic/basic_endpoint.hpp" -#include "asio/generic/datagram_protocol.hpp" -#include "asio/generic/raw_protocol.hpp" -#include "asio/generic/seq_packet_protocol.hpp" -#include "asio/generic/stream_protocol.hpp" -#include "asio/handler_alloc_hook.hpp" -#include "asio/handler_continuation_hook.hpp" -#include "asio/handler_invoke_hook.hpp" -#include "asio/handler_type.hpp" -#include "asio/io_service.hpp" -#include "asio/ip/address.hpp" -#include "asio/ip/address_v4.hpp" -#include "asio/ip/address_v6.hpp" -#include "asio/ip/basic_endpoint.hpp" -#include "asio/ip/basic_resolver.hpp" -#include "asio/ip/basic_resolver_entry.hpp" -#include "asio/ip/basic_resolver_iterator.hpp" -#include "asio/ip/basic_resolver_query.hpp" -#include "asio/ip/host_name.hpp" -#include "asio/ip/icmp.hpp" -#include "asio/ip/multicast.hpp" -#include "asio/ip/resolver_query_base.hpp" -#include "asio/ip/resolver_service.hpp" -#include "asio/ip/tcp.hpp" -#include "asio/ip/udp.hpp" -#include "asio/ip/unicast.hpp" -#include "asio/ip/v6_only.hpp" -#include "asio/is_read_buffered.hpp" -#include "asio/is_write_buffered.hpp" -#include "asio/local/basic_endpoint.hpp" -#include "asio/local/connect_pair.hpp" -#include "asio/local/datagram_protocol.hpp" -#include "asio/local/stream_protocol.hpp" -#include "asio/placeholders.hpp" -#include "asio/posix/basic_descriptor.hpp" -#include "asio/posix/basic_stream_descriptor.hpp" -#include "asio/posix/descriptor_base.hpp" -#include "asio/posix/stream_descriptor.hpp" -#include "asio/posix/stream_descriptor_service.hpp" -#include "asio/raw_socket_service.hpp" -#include "asio/read.hpp" -#include "asio/read_at.hpp" -#include "asio/read_until.hpp" -#include "asio/seq_packet_socket_service.hpp" -#include "asio/serial_port.hpp" -#include "asio/serial_port_base.hpp" -#include "asio/serial_port_service.hpp" -#include "asio/signal_set.hpp" -#include "asio/signal_set_service.hpp" -#include "asio/socket_acceptor_service.hpp" -#include "asio/socket_base.hpp" -#include "asio/strand.hpp" -#include "asio/stream_socket_service.hpp" -#include "asio/streambuf.hpp" -#include "asio/system_error.hpp" -#include "asio/thread.hpp" -#include "asio/time_traits.hpp" -#include "asio/version.hpp" -#include "asio/wait_traits.hpp" -#include "asio/waitable_timer_service.hpp" -#include "asio/windows/basic_handle.hpp" -#include "asio/windows/basic_object_handle.hpp" -#include "asio/windows/basic_random_access_handle.hpp" -#include "asio/windows/basic_stream_handle.hpp" -#include "asio/windows/object_handle.hpp" -#include "asio/windows/object_handle_service.hpp" -#include "asio/windows/overlapped_ptr.hpp" -#include "asio/windows/random_access_handle.hpp" -#include "asio/windows/random_access_handle_service.hpp" -#include "asio/windows/stream_handle.hpp" -#include "asio/windows/stream_handle_service.hpp" -#include "asio/write.hpp" -#include "asio/write_at.hpp" - -#endif // ASIO_HPP http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a1e894/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/third_party/asio-1.10.2/include/asio/async_result.hpp ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/third_party/asio-1.10.2/include/asio/async_result.hpp b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/third_party/asio-1.10.2/include/asio/async_result.hpp deleted file mode 100644 index b98d770..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/native/libhdfspp/third_party/asio-1.10.2/include/asio/async_result.hpp +++ /dev/null @@ -1,94 +0,0 @@ -// -// async_result.hpp -// ~~~~~~~~~~~~~~~~ -// -// Copyright (c) 2003-2014 Christopher M. Kohlhoff (chris at kohlhoff dot com) -// -// Distributed under the Boost Software License, Version 1.0. (See accompanying -// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) -// - -#ifndef ASIO_ASYNC_RESULT_HPP -#define ASIO_ASYNC_RESULT_HPP - -#if defined(_MSC_VER) && (_MSC_VER >= 1200) -# pragma once -#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) - -#include "asio/detail/config.hpp" -#include "asio/handler_type.hpp" - -#include "asio/detail/push_options.hpp" - -namespace asio { - -/// An interface for customising the behaviour of an initiating function. -/** - * This template may be specialised for user-defined handler types. - */ -template <typename Handler> -class async_result -{ -public: - /// The return type of the initiating function. - typedef void type; - - /// Construct an async result from a given handler. - /** - * When using a specalised async_result, the constructor has an opportunity - * to initialise some state associated with the handler, which is then - * returned from the initiating function. - */ - explicit async_result(Handler&) - { - } - - /// Obtain the value to be returned from the initiating function. - type get() - { - } -}; - -namespace detail { - -// Helper template to deduce the true type of a handler, capture a local copy -// of the handler, and then create an async_result for the handler. -template <typename Handler, typename Signature> -struct async_result_init -{ - explicit async_result_init(ASIO_MOVE_ARG(Handler) orig_handler) - : handler(ASIO_MOVE_CAST(Handler)(orig_handler)), - result(handler) - { - } - - typename handler_type<Handler, Signature>::type handler; - async_result<typename handler_type<Handler, Signature>::type> result; -}; - -template <typename Handler, typename Signature> -struct async_result_type_helper -{ - typedef typename async_result< - typename handler_type<Handler, Signature>::type - >::type type; -}; - -} // namespace detail -} // namespace asio - -#include "asio/detail/pop_options.hpp" - -#if defined(GENERATING_DOCUMENTATION) -# define ASIO_INITFN_RESULT_TYPE(h, sig) \ - void_or_deduced -#elif defined(_MSC_VER) && (_MSC_VER < 1500) -# define ASIO_INITFN_RESULT_TYPE(h, sig) \ - typename ::asio::detail::async_result_type_helper<h, sig>::type -#else -# define ASIO_INITFN_RESULT_TYPE(h, sig) \ - typename ::asio::async_result< \ - typename ::asio::handler_type<h, sig>::type>::type -#endif - -#endif // ASIO_ASYNC_RESULT_HPP
