Repository: hbase Updated Branches: refs/heads/HBASE-14850 a93c6a998 -> 1193812d7
HBASE-18338 [C++] Implement RpcTestServer (Xiaobing Zhou) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1193812d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1193812d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1193812d Branch: refs/heads/HBASE-14850 Commit: 1193812d784f407ab8596380e003b65de27a117a Parents: a93c6a9 Author: Enis Soztutar <e...@apache.org> Authored: Fri Jul 21 16:29:44 2017 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Fri Jul 21 16:29:44 2017 -0700 ---------------------------------------------------------------------- hbase-native-client/connection/BUCK | 13 ++ .../connection/client-handler.cc | 21 +- hbase-native-client/connection/client-handler.h | 7 +- hbase-native-client/connection/pipeline.cc | 12 +- .../connection/rpc-test-server-handler.cc | 77 ++++++ .../connection/rpc-test-server-handler.h | 47 ++++ .../connection/rpc-test-server.cc | 70 ++++++ .../connection/rpc-test-server.h | 50 ++++ hbase-native-client/connection/rpc-test.cc | 86 +++++++ hbase-native-client/connection/sasl-handler.h | 2 +- hbase-native-client/if/test.proto | 43 ++++ hbase-native-client/if/test_rpc_service.proto | 35 +++ hbase-native-client/serde/BUCK | 4 +- .../serde/client-deserializer-test.cc | 3 +- .../serde/client-serializer-test.cc | 2 +- hbase-native-client/serde/rpc-serde.cc | 234 +++++++++++++++++++ hbase-native-client/serde/rpc-serde.h | 141 +++++++++++ hbase-native-client/serde/rpc.cc | 222 ------------------ hbase-native-client/serde/rpc.h | 125 ---------- 19 files changed, 827 insertions(+), 367 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/BUCK b/hbase-native-client/connection/BUCK index c3119eb..aaf8fdb 100644 --- a/hbase-native-client/connection/BUCK +++ b/hbase-native-client/connection/BUCK @@ -33,6 +33,8 @@ cxx_library( "service.h", "rpc-client.h", "sasl-util.h", + "rpc-test-server.h", + "rpc-test-server-handler.h", ], srcs=[ "client-dispatcher.cc", @@ -44,6 +46,8 @@ cxx_library( "rpc-client.cc", "sasl-handler.cc", "sasl-util.cc", + "rpc-test-server.cc", + "rpc-test-server-handler.cc", ], deps=[ "//if:if", @@ -68,3 +72,12 @@ cxx_test( deps=[ ":connection", ],) +cxx_test( + name="rpc-test", + srcs=[ + "rpc-test.cc", + ], + deps=[ + ":connection", + ], + run_test_separately=True,) http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/client-handler.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.cc b/hbase-native-client/connection/client-handler.cc index 052c171..39227d3 100644 --- a/hbase-native-client/connection/client-handler.cc +++ b/hbase-native-client/connection/client-handler.cc @@ -35,9 +35,10 @@ using google::protobuf::Message; namespace hbase { ClientHandler::ClientHandler(std::string user_name, std::shared_ptr<Codec> codec, - const std::string &server) + std::shared_ptr<Configuration> conf, const std::string &server) : user_name_(user_name), serde_(codec), + conf_(conf), server_(server), once_flag_(std::make_unique<std::once_flag>()), resp_msgs_( @@ -115,13 +116,17 @@ void ClientHandler::read(Context *ctx, std::unique_ptr<folly::IOBuf> buf) { } folly::Future<folly::Unit> ClientHandler::write(Context *ctx, std::unique_ptr<Request> r) { - // We need to send the header once. - // So use call_once to make sure that only one thread wins this. - std::call_once((*once_flag_), [ctx, this]() { - VLOG(3) << "Writing RPC Header to server: " << server_; - auto header = serde_.Header(user_name_); - ctx->fireWrite(std::move(header)); - }); + /* for RPC test, there's no need to send connection header */ + if (!conf_->GetBool(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, + RpcSerde::DEFAULT_HBASE_CLIENT_RPC_TEST_MODE)) { + // We need to send the header once. + // So use call_once to make sure that only one thread wins this. + std::call_once((*once_flag_), [ctx, this]() { + VLOG(3) << "Writing RPC Header to server: " << server_; + auto header = serde_.Header(user_name_); + ctx->fireWrite(std::move(header)); + }); + } VLOG(3) << "Writing RPC Request:" << r->DebugString() << ", server: " << server_; http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/client-handler.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/client-handler.h b/hbase-native-client/connection/client-handler.h index 8de3a8b..b6f19a2 100644 --- a/hbase-native-client/connection/client-handler.h +++ b/hbase-native-client/connection/client-handler.h @@ -26,9 +26,10 @@ #include <string> #include <utility> +#include "core/configuration.h" #include "exceptions/exception.h" #include "serde/codec.h" -#include "serde/rpc.h" +#include "serde/rpc-serde.h" #include "utils/concurrent-map.h" // Forward decs. @@ -60,7 +61,8 @@ class ClientHandler * Create the handler * @param user_name the user name of the user running this process. */ - ClientHandler(std::string user_name, std::shared_ptr<Codec> codec, const std::string &server); + ClientHandler(std::string user_name, std::shared_ptr<Codec> codec, + std::shared_ptr<Configuration> conf, const std::string &server); /** * Get bytes from the wire. @@ -79,6 +81,7 @@ class ClientHandler std::string user_name_; RpcSerde serde_; std::string server_; // for logging + std::shared_ptr<Configuration> conf_; // in flight requests std::unique_ptr<concurrent_map<uint32_t, std::shared_ptr<google::protobuf::Message>>> resp_msgs_; http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/pipeline.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/pipeline.cc b/hbase-native-client/connection/pipeline.cc index 2844752..9c790b6 100644 --- a/hbase-native-client/connection/pipeline.cc +++ b/hbase-native-client/connection/pipeline.cc @@ -32,7 +32,6 @@ namespace hbase { RpcPipelineFactory::RpcPipelineFactory(std::shared_ptr<Codec> codec, std::shared_ptr<Configuration> conf) : user_util_(), codec_(codec), conf_(conf) {} - SerializePipeline::Ptr RpcPipelineFactory::newPipeline( std::shared_ptr<folly::AsyncTransportWrapper> sock) { folly::SocketAddress addr; // for logging @@ -41,10 +40,15 @@ SerializePipeline::Ptr RpcPipelineFactory::newPipeline( auto pipeline = SerializePipeline::create(); pipeline->addBack(wangle::AsyncSocketHandler{sock}); pipeline->addBack(wangle::EventBaseHandler{}); - auto secure = security::User::IsSecurityEnabled(*conf_); - pipeline->addBack(SaslHandler{user_util_.user_name(secure), conf_}); + bool secure = false; + /* for RPC test, there's no need to setup Sasl */ + if (!conf_->GetBool(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, + RpcSerde::DEFAULT_HBASE_CLIENT_RPC_TEST_MODE)) { + secure = security::User::IsSecurityEnabled(*conf_); + pipeline->addBack(SaslHandler{user_util_.user_name(secure), conf_}); + } pipeline->addBack(wangle::LengthFieldBasedFrameDecoder{}); - pipeline->addBack(ClientHandler{user_util_.user_name(secure), codec_, addr.describe()}); + pipeline->addBack(ClientHandler{user_util_.user_name(secure), codec_, conf_, addr.describe()}); pipeline->finalize(); return pipeline; } http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test-server-handler.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-test-server-handler.cc b/hbase-native-client/connection/rpc-test-server-handler.cc new file mode 100644 index 0000000..7d2f407 --- /dev/null +++ b/hbase-native-client/connection/rpc-test-server-handler.cc @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "connection/rpc-test-server-handler.h" +#include "if/RPC.pb.h" +#include "if/test.pb.h" + +namespace hbase { + +void RpcTestServerSerializeHandler::read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) { + buf->coalesce(); + pb::RequestHeader header; + + int used_bytes = serde_.ParseDelimited(buf.get(), &header); + VLOG(3) << "Read RPC RequestHeader size=" << used_bytes << " call_id=" << header.call_id(); + + auto received = CreateReceivedRequest(header.method_name()); + + buf->trimStart(used_bytes); + if (header.has_request_param() && received != nullptr) { + used_bytes = serde_.ParseDelimited(buf.get(), received->req_msg().get()); + VLOG(3) << "Read RPCRequest, buf length:" << buf->length() + << ", header PB length:" << used_bytes; + received->set_call_id(header.call_id()); + } + + if (received != nullptr) { + ctx->fireRead(std::move(received)); + } +} + +folly::Future<folly::Unit> RpcTestServerSerializeHandler::write(Context* ctx, + std::unique_ptr<Response> r) { + VLOG(3) << "Writing RPC Request"; + // Send the data down the pipeline. + return ctx->fireWrite(serde_.Response(r->call_id(), r->resp_msg().get())); +} + +std::unique_ptr<Request> RpcTestServerSerializeHandler::CreateReceivedRequest( + const std::string& method_name) { + std::unique_ptr<Request> result = nullptr; + ; + if (method_name == "ping") { + result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(), + std::make_shared<EmptyResponseProto>(), method_name); + } else if (method_name == "echo") { + result = std::make_unique<Request>(std::make_shared<EchoRequestProto>(), + std::make_shared<EchoResponseProto>(), method_name); + } else if (method_name == "error") { + result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(), + std::make_shared<EmptyResponseProto>(), method_name); + } else if (method_name == "pause") { + result = std::make_unique<Request>(std::make_shared<PauseRequestProto>(), + std::make_shared<EmptyResponseProto>(), method_name); + } else if (method_name == "addr") { + result = std::make_unique<Request>(std::make_shared<EmptyRequestProto>(), + std::make_shared<AddrResponseProto>(), method_name); + } + return result; +} +} // end of namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test-server-handler.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-test-server-handler.h b/hbase-native-client/connection/rpc-test-server-handler.h new file mode 100644 index 0000000..4c84615 --- /dev/null +++ b/hbase-native-client/connection/rpc-test-server-handler.h @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <wangle/channel/Handler.h> + +#include "connection/request.h" +#include "connection/response.h" +#include "serde/rpc-serde.h" + +using namespace hbase; + +namespace hbase { +// A real rpc server would probably use generated client/server stubs +class RpcTestServerSerializeHandler + : public wangle::Handler<std::unique_ptr<folly::IOBuf>, std::unique_ptr<Request>, + std::unique_ptr<Response>, std::unique_ptr<folly::IOBuf>> { + public: + RpcTestServerSerializeHandler() : serde_() {} + + void read(Context* ctx, std::unique_ptr<folly::IOBuf> buf) override; + + folly::Future<folly::Unit> write(Context* ctx, std::unique_ptr<Response> r) override; + + private: + std::unique_ptr<Request> CreateReceivedRequest(const std::string& method_name); + + private: + hbase::RpcSerde serde_; +}; +} // end of namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test-server.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-test-server.cc b/hbase-native-client/connection/rpc-test-server.cc new file mode 100644 index 0000000..d3a30b1 --- /dev/null +++ b/hbase-native-client/connection/rpc-test-server.cc @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#include <wangle/channel/AsyncSocketHandler.h> +#include <wangle/channel/EventBaseHandler.h> +#include <wangle/codec/LengthFieldBasedFrameDecoder.h> +#include <wangle/codec/LengthFieldPrepender.h> +#include <wangle/service/ServerDispatcher.h> + +#include "connection/rpc-test-server-handler.h" +#include "connection/rpc-test-server.h" +#include "if/test.pb.h" + +namespace hbase { + +RpcTestServerSerializePipeline::Ptr RpcTestServerPipelineFactory::newPipeline( + std::shared_ptr<AsyncTransportWrapper> sock) { + auto pipeline = RpcTestServerSerializePipeline::create(); + pipeline->addBack(AsyncSocketHandler(sock)); + // ensure we can write from any thread + pipeline->addBack(EventBaseHandler()); + pipeline->addBack(LengthFieldBasedFrameDecoder()); + pipeline->addBack(RpcTestServerSerializeHandler()); + pipeline->addBack( + MultiplexServerDispatcher<std::unique_ptr<Request>, std::unique_ptr<Response>>(&service_)); + pipeline->finalize(); + + return pipeline; +} + +Future<std::unique_ptr<Response>> RpcTestService::operator()(std::unique_ptr<Request> request) { + /* build Response */ + auto response = std::make_unique<Response>(); + response->set_call_id(request->call_id()); + std::string method_name = request->method(); + + if (method_name == "ping") { + auto pb_resp_msg = std::make_shared<EmptyResponseProto>(); + response->set_resp_msg(pb_resp_msg); + } else if (method_name == "echo") { + auto pb_resp_msg = std::make_shared<EchoResponseProto>(); + auto pb_req_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg()); + pb_resp_msg->set_message(pb_req_msg->message()); + response->set_resp_msg(pb_resp_msg); + } else if (method_name == "error") { + // TODO: + } else if (method_name == "pause") { + // TODO: + } else if (method_name == "addr") { + // TODO: + } + + return folly::makeFuture<std::unique_ptr<Response>>(std::move(response)); +} +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test-server.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-test-server.h b/hbase-native-client/connection/rpc-test-server.h new file mode 100644 index 0000000..c3225ff --- /dev/null +++ b/hbase-native-client/connection/rpc-test-server.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once +#include <wangle/concurrent/CPUThreadPoolExecutor.h> +#include <wangle/service/ExecutorFilter.h> +#include <wangle/service/Service.h> + +#include "connection/request.h" +#include "connection/response.h" + +using namespace hbase; +using namespace folly; +using namespace wangle; + +namespace hbase { +using RpcTestServerSerializePipeline = wangle::Pipeline<IOBufQueue&, std::unique_ptr<Response>>; + +class RpcTestService : public Service<std::unique_ptr<Request>, std::unique_ptr<Response>> { + public: + RpcTestService() {} + virtual ~RpcTestService() = default; + Future<std::unique_ptr<Response>> operator()(std::unique_ptr<Request> request) override; +}; + +class RpcTestServerPipelineFactory : public PipelineFactory<RpcTestServerSerializePipeline> { + public: + RpcTestServerSerializePipeline::Ptr newPipeline( + std::shared_ptr<AsyncTransportWrapper> sock) override; + + private: + ExecutorFilter<std::unique_ptr<Request>, std::unique_ptr<Response>> service_{ + std::make_shared<CPUThreadPoolExecutor>(1), std::make_shared<RpcTestService>()}; +}; +} // end of namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/rpc-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/rpc-test.cc b/hbase-native-client/connection/rpc-test.cc new file mode 100644 index 0000000..d4cd89f --- /dev/null +++ b/hbase-native-client/connection/rpc-test.cc @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <wangle/bootstrap/ClientBootstrap.h> +#include <wangle/channel/Handler.h> + +#include <folly/Logging.h> +#include <folly/SocketAddress.h> +#include <folly/String.h> +#include <folly/experimental/TestUtil.h> +#include <gflags/gflags.h> +#include <glog/logging.h> +#include <gtest/gtest.h> +#include <boost/thread.hpp> + +#include "connection/rpc-client.h" +#include "if/test.pb.h" +#include "rpc-test-server.h" +#include "security/user.h" +#include "serde/rpc-serde.h" + +using namespace wangle; +using namespace folly; +using namespace hbase; + +DEFINE_int32(port, 0, "test server port"); + +TEST(RpcTestServer, echo) { + /* create conf */ + auto conf = std::make_shared<Configuration>(); + conf->Set(RpcSerde::HBASE_CLIENT_RPC_TEST_MODE, "true"); + + /* create rpc test server */ + auto server = std::make_shared<ServerBootstrap<RpcTestServerSerializePipeline>>(); + server->childPipeline(std::make_shared<RpcTestServerPipelineFactory>()); + server->bind(FLAGS_port); + folly::SocketAddress server_addr; + server->getSockets()[0]->getAddress(&server_addr); + + /* create RpcClient */ + auto io_executor = std::make_shared<wangle::IOThreadPoolExecutor>(1); + + auto rpc_client = std::make_shared<RpcClient>(io_executor, nullptr, conf); + + /** + * test echo + */ + try { + std::string greetings = "hello, hbase server!"; + auto request = std::make_unique<Request>(std::make_shared<EchoRequestProto>(), + std::make_shared<EchoResponseProto>(), "echo"); + auto pb_msg = std::static_pointer_cast<EchoRequestProto>(request->req_msg()); + pb_msg->set_message(greetings); + + /* sending out request */ + rpc_client + ->AsyncCall(server_addr.getAddressStr(), server_addr.getPort(), std::move(request), + hbase::security::User::defaultUser()) + .then([=](std::unique_ptr<Response> response) { + auto pb_resp = std::static_pointer_cast<EchoResponseProto>(response->resp_msg()); + VLOG(1) << "message returned: " + pb_resp->message(); + EXPECT_EQ(greetings, pb_resp->message()); + }); + } catch (const std::exception& e) { + throw e; + } + + server->stop(); + server->join(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/connection/sasl-handler.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/connection/sasl-handler.h b/hbase-native-client/connection/sasl-handler.h index f606a23..81f4e81 100644 --- a/hbase-native-client/connection/sasl-handler.h +++ b/hbase-native-client/connection/sasl-handler.h @@ -30,7 +30,7 @@ #include "connection/sasl-util.h" #include "connection/service.h" #include "security/user.h" -#include "serde/rpc.h" +#include "serde/rpc-serde.h" namespace hbase { http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/if/test.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/if/test.proto b/hbase-native-client/if/test.proto new file mode 100644 index 0000000..72b68e9 --- /dev/null +++ b/hbase-native-client/if/test.proto @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated"; +option java_outer_classname = "TestProtos"; +option java_generate_equals_and_hash = true; + +message EmptyRequestProto { +} + +message EmptyResponseProto { +} + +message EchoRequestProto { + required string message = 1; +} + +message EchoResponseProto { + required string message = 1; +} + +message PauseRequestProto { + required uint32 ms = 1; +} + +message AddrResponseProto { + required string addr = 1; +} http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/if/test_rpc_service.proto ---------------------------------------------------------------------- diff --git a/hbase-native-client/if/test_rpc_service.proto b/hbase-native-client/if/test_rpc_service.proto new file mode 100644 index 0000000..5f91dc4 --- /dev/null +++ b/hbase-native-client/if/test_rpc_service.proto @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +option java_package = "org.apache.hadoop.hbase.ipc.protobuf.generated"; +option java_outer_classname = "TestRpcServiceProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; + +import "test.proto"; + + +/** + * A protobuf service for use in tests + */ +service TestProtobufRpcProto { + rpc ping(EmptyRequestProto) returns (EmptyResponseProto); + rpc echo(EchoRequestProto) returns (EchoResponseProto); + rpc error(EmptyRequestProto) returns (EmptyResponseProto); + rpc pause(PauseRequestProto) returns (EmptyResponseProto); + rpc addr(EmptyRequestProto) returns (AddrResponseProto); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/BUCK ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/BUCK b/hbase-native-client/serde/BUCK index 18e949c..a765884 100644 --- a/hbase-native-client/serde/BUCK +++ b/hbase-native-client/serde/BUCK @@ -22,13 +22,13 @@ cxx_library( "cell-outputstream.h", "codec.h", "region-info.h", - "rpc.h", + "rpc-serde.h", "server-name.h", "table-name.h", "zk.h", ], srcs=[ - "rpc.cc", + "rpc-serde.cc", "zk.cc", ], deps=[ http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/client-deserializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-deserializer-test.cc b/hbase-native-client/serde/client-deserializer-test.cc index 054684d..1856047 100644 --- a/hbase-native-client/serde/client-deserializer-test.cc +++ b/hbase-native-client/serde/client-deserializer-test.cc @@ -16,12 +16,11 @@ * limitations under the License. * */ -#include "serde/rpc.h" - #include <folly/io/IOBuf.h> #include <gtest/gtest.h> #include "if/Client.pb.h" +#include "rpc-serde.h" using namespace hbase; using folly::IOBuf; http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/client-serializer-test.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/client-serializer-test.cc b/hbase-native-client/serde/client-serializer-test.cc index 7d8b29c..306f2c2 100644 --- a/hbase-native-client/serde/client-serializer-test.cc +++ b/hbase-native-client/serde/client-serializer-test.cc @@ -24,7 +24,7 @@ #include "if/HBase.pb.h" #include "if/RPC.pb.h" -#include "serde/rpc.h" +#include "rpc-serde.h" using namespace hbase; using namespace hbase::pb; http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/rpc-serde.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/rpc-serde.cc b/hbase-native-client/serde/rpc-serde.cc new file mode 100644 index 0000000..9e1f79a --- /dev/null +++ b/hbase-native-client/serde/rpc-serde.cc @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <folly/Conv.h> +#include <folly/Logging.h> +#include <folly/io/Cursor.h> +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> +#include <google/protobuf/message.h> +#include <boost/algorithm/string.hpp> + +#include <utility> + +#include "if/RPC.pb.h" +#include "rpc-serde.h" +#include "utils/version.h" + +using folly::IOBuf; +using folly::io::RWPrivateCursor; +using google::protobuf::Message; +using google::protobuf::io::ArrayInputStream; +using google::protobuf::io::ArrayOutputStream; +using google::protobuf::io::CodedInputStream; +using google::protobuf::io::CodedOutputStream; +using google::protobuf::io::ZeroCopyOutputStream; + +namespace hbase { + +static const std::string PREAMBLE = "HBas"; +static const std::string INTERFACE = "ClientService"; +static const uint8_t RPC_VERSION = 0; +static const uint8_t DEFAULT_AUTH_TYPE = 80; +static const uint8_t KERBEROS_AUTH_TYPE = 81; + +int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) { + if (buf == nullptr || msg == nullptr) { + return -2; + } + + DCHECK(!buf->isChained()); + + ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())}; + CodedInputStream coded_stream{&ais}; + + uint32_t msg_size; + + // Try and read the varint. + if (coded_stream.ReadVarint32(&msg_size) == false) { + FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t"; + return -3; + } + + coded_stream.PushLimit(msg_size); + // Parse the message. + if (msg->MergeFromCodedStream(&coded_stream) == false) { + FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a protobuf message from data."; + return -4; + } + + // Make sure all the data was consumed. + if (coded_stream.ConsumedEntireMessage() == false) { + FB_LOG_EVERY_MS(ERROR, 1000) << "Orphaned data left after reading protobuf message"; + return -5; + } + + return coded_stream.CurrentPosition(); +} + +RpcSerde::RpcSerde() {} + +RpcSerde::RpcSerde(std::shared_ptr<Codec> codec) : codec_(codec) {} + +std::unique_ptr<IOBuf> RpcSerde::Preamble(bool secure) { + auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2); + magic->append(2); + RWPrivateCursor c(magic.get()); + c.skip(4); + // Version + c.write(RPC_VERSION); + if (secure) { + // for now support only KERBEROS (DIGEST is not supported) + c.write(KERBEROS_AUTH_TYPE); + } else { + c.write(DEFAULT_AUTH_TYPE); + } + return magic; +} + +std::unique_ptr<IOBuf> RpcSerde::Header(const std::string &user) { + pb::ConnectionHeader h; + + // TODO(eclark): Make this not a total lie. + h.mutable_user_info()->set_effective_user(user); + // The service name that we want to talk to. + // + // Right now we're completely ignoring the service interface. + // That may or may not be the correct thing to do. + // It worked for a while with the java client; until it + // didn't. + // TODO: send the service name and user from the RpcClient + h.set_service_name(INTERFACE); + + std::unique_ptr<pb::VersionInfo> version_info = CreateVersionInfo(); + + h.set_allocated_version_info(version_info.release()); + + if (codec_ != nullptr) { + h.set_cell_block_codec_class(codec_->java_class_name()); + } + return PrependLength(SerializeMessage(h)); +} + +std::unique_ptr<pb::VersionInfo> RpcSerde::CreateVersionInfo() { + std::unique_ptr<pb::VersionInfo> version_info = std::make_unique<pb::VersionInfo>(); + version_info->set_user(Version::user); + version_info->set_revision(Version::revision); + version_info->set_url(Version::url); + version_info->set_date(Version::date); + version_info->set_src_checksum(Version::src_checksum); + version_info->set_version(Version::version); + + std::string version{Version::version}; + std::vector<std::string> version_parts; + boost::split(version_parts, version, boost::is_any_of("."), boost::token_compress_on); + uint32_t major_version = 0, minor_version = 0; + if (version_parts.size() >= 2) { + version_info->set_version_major(folly::to<uint32_t>(version_parts[0])); + version_info->set_version_minor(folly::to<uint32_t>(version_parts[1])); + } + + VLOG(1) << "Client VersionInfo:" << version_info->ShortDebugString(); + return version_info; +} + +std::unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id, const std::string &method, + const Message *msg) { + pb::RequestHeader rq; + rq.set_method_name(method); + rq.set_call_id(call_id); + rq.set_request_param(msg != nullptr); + auto ser_header = SerializeDelimited(rq); + if (msg != nullptr) { + auto ser_req = SerializeDelimited(*msg); + ser_header->appendChain(std::move(ser_req)); + } + + return PrependLength(std::move(ser_header)); +} + +std::unique_ptr<folly::IOBuf> RpcSerde::Response(const uint32_t call_id, + const google::protobuf::Message *msg) { + pb::ResponseHeader rh; + rh.set_call_id(call_id); + auto ser_header = SerializeDelimited(rh); + auto ser_resp = SerializeDelimited(*msg); + ser_header->appendChain(std::move(ser_resp)); + + return PrependLength(std::move(ser_header)); +} + +std::unique_ptr<CellScanner> RpcSerde::CreateCellScanner(std::unique_ptr<folly::IOBuf> buf, + uint32_t offset, uint32_t length) { + if (codec_ == nullptr) { + return nullptr; + } + return codec_->CreateDecoder(std::move(buf), offset, length); +} + +std::unique_ptr<IOBuf> RpcSerde::PrependLength(std::unique_ptr<IOBuf> msg) { + // Java ints are 4 long. So create a buffer that large + auto len_buf = IOBuf::create(4); + // Then make those bytes visible. + len_buf->append(4); + + RWPrivateCursor c(len_buf.get()); + // Get the size of the data to be pushed out the network. + auto size = msg->computeChainDataLength(); + + // Write the length to this IOBuf. + c.writeBE(static_cast<uint32_t>(size)); + + // Then attach the origional to the back of len_buf + len_buf->appendChain(std::move(msg)); + return len_buf; +} + +std::unique_ptr<IOBuf> RpcSerde::SerializeDelimited(const Message &msg) { + // Get the buffer size needed for just the message. + int msg_size = msg.ByteSize(); + int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size; + + // Create a buffer big enough to hold the varint and the object. + auto buf = IOBuf::create(buf_size); + buf->append(buf_size); + + // Create the array output stream. + ArrayOutputStream aos{buf->writableData(), static_cast<int>(buf->length())}; + // Wrap the ArrayOuputStream in the coded output stream to allow writing + // Varint32 + CodedOutputStream cos{&aos}; + + // Write out the size. + cos.WriteVarint32(msg_size); + + // Now write the rest out. + // We're using the protobuf output streams here to keep track + // of where in the output array we are rather than IOBuf. + msg.SerializeWithCachedSizesToArray(cos.GetDirectBufferForNBytesAndAdvance(msg_size)); + + // Return the buffer. + return buf; +} +// TODO(eclark): Make this 1 copy. +std::unique_ptr<IOBuf> RpcSerde::SerializeMessage(const Message &msg) { + auto buf = IOBuf::copyBuffer(msg.SerializeAsString()); + return buf; +} +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/rpc-serde.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/rpc-serde.h b/hbase-native-client/serde/rpc-serde.h new file mode 100644 index 0000000..0e1d44e --- /dev/null +++ b/hbase-native-client/serde/rpc-serde.h @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +#pragma once + +#include <memory> +#include <string> + +#include "if/HBase.pb.h" +#include "serde/cell-scanner.h" +#include "serde/codec.h" + +// Forward +namespace folly { +class IOBuf; +} +namespace google { +namespace protobuf { +class Message; +} +} + +namespace hbase { + +/** + * @brief Class for serializing a deserializing rpc formatted data. + * + * RpcSerde is the one stop shop for reading/writing data to HBase daemons. + * It should throw exceptions if anything goes wrong. + */ +class RpcSerde { + public: + RpcSerde(); + /** + * Constructor assumes the default auth type. + */ + RpcSerde(std::shared_ptr<Codec> codec); + + /** + * Destructor. This is provided just for testing purposes. + */ + virtual ~RpcSerde() = default; + + /** + * Pase a message in the delimited format. + * + * A message in delimited format consists of the following: + * + * - a protobuf var int32. + * - A protobuf object serialized. + */ + int ParseDelimited(const folly::IOBuf *buf, google::protobuf::Message *msg); + + /** + * Create a new connection preamble in a new IOBuf. + */ + static std::unique_ptr<folly::IOBuf> Preamble(bool secure); + + /** + * Create the header protobuf object and serialize it to a new IOBuf. + * Header is in the following format: + * + * - Big endian length + * - ConnectionHeader object serialized out. + */ + std::unique_ptr<folly::IOBuf> Header(const std::string &user); + + /** + * Take ownership of the passed buffer, and create a CellScanner using the + * Codec class to parse Cells out of the wire. + */ + std::unique_ptr<CellScanner> CreateCellScanner(std::unique_ptr<folly::IOBuf> buf, uint32_t offset, + uint32_t length); + + /** + * Serialize a request message into a protobuf. + * Request consists of: + * + * - Big endian length + * - RequestHeader object + * - The passed in Message object + */ + std::unique_ptr<folly::IOBuf> Request(const uint32_t call_id, const std::string &method, + const google::protobuf::Message *msg); + + /** + * Serialize a response message into a protobuf. + * Request consists of: + * + * - Big endian length + * - ResponseHeader object + * - The passed in Message object + */ + std::unique_ptr<folly::IOBuf> Response(const uint32_t call_id, + const google::protobuf::Message *msg); + + /** + * Serialize a message in the delimited format. + * Delimited format consists of the following: + * + * - A protobuf var int32 + * - The message object seriailized after that. + */ + std::unique_ptr<folly::IOBuf> SerializeDelimited(const google::protobuf::Message &msg); + + /** + * Serilalize a message. This does not add any length prepend. + */ + std::unique_ptr<folly::IOBuf> SerializeMessage(const google::protobuf::Message &msg); + + /** + * Prepend a length IOBuf to the given IOBuf chain. + * This involves no copies or moves of the passed in data. + */ + std::unique_ptr<folly::IOBuf> PrependLength(std::unique_ptr<folly::IOBuf> msg); + + public: + static constexpr const char *HBASE_CLIENT_RPC_TEST_MODE = "hbase.client.rpc.test.mode"; + static constexpr const bool DEFAULT_HBASE_CLIENT_RPC_TEST_MODE = false; + + private: + /* data */ + std::shared_ptr<Codec> codec_; + std::unique_ptr<pb::VersionInfo> CreateVersionInfo(); +}; +} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/rpc.cc ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/rpc.cc b/hbase-native-client/serde/rpc.cc deleted file mode 100644 index 957a317..0000000 --- a/hbase-native-client/serde/rpc.cc +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include "serde/rpc.h" - -#include <folly/Conv.h> -#include <folly/Logging.h> -#include <folly/io/Cursor.h> -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream_impl_lite.h> -#include <google/protobuf/message.h> -#include <boost/algorithm/string.hpp> - -#include <utility> - -#include "if/RPC.pb.h" -#include "utils/version.h" - -using folly::IOBuf; -using folly::io::RWPrivateCursor; -using google::protobuf::Message; -using google::protobuf::io::ArrayInputStream; -using google::protobuf::io::ArrayOutputStream; -using google::protobuf::io::CodedInputStream; -using google::protobuf::io::CodedOutputStream; -using google::protobuf::io::ZeroCopyOutputStream; - -namespace hbase { - -static const std::string PREAMBLE = "HBas"; -static const std::string INTERFACE = "ClientService"; -static const uint8_t RPC_VERSION = 0; -static const uint8_t DEFAULT_AUTH_TYPE = 80; -static const uint8_t KERBEROS_AUTH_TYPE = 81; - -int RpcSerde::ParseDelimited(const IOBuf *buf, Message *msg) { - if (buf == nullptr || msg == nullptr) { - return -2; - } - - DCHECK(!buf->isChained()); - - ArrayInputStream ais{buf->data(), static_cast<int>(buf->length())}; - CodedInputStream coded_stream{&ais}; - - uint32_t msg_size; - - // Try and read the varint. - if (coded_stream.ReadVarint32(&msg_size) == false) { - FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a var uint32_t"; - return -3; - } - - coded_stream.PushLimit(msg_size); - // Parse the message. - if (msg->MergeFromCodedStream(&coded_stream) == false) { - FB_LOG_EVERY_MS(ERROR, 1000) << "Unable to read a protobuf message from data."; - return -4; - } - - // Make sure all the data was consumed. - if (coded_stream.ConsumedEntireMessage() == false) { - FB_LOG_EVERY_MS(ERROR, 1000) << "Orphaned data left after reading protobuf message"; - return -5; - } - - return coded_stream.CurrentPosition(); -} - -RpcSerde::RpcSerde(std::shared_ptr<Codec> codec) : codec_(codec) {} - -std::unique_ptr<IOBuf> RpcSerde::Preamble(bool secure) { - auto magic = IOBuf::copyBuffer(PREAMBLE, 0, 2); - magic->append(2); - RWPrivateCursor c(magic.get()); - c.skip(4); - // Version - c.write(RPC_VERSION); - if (secure) { - // for now support only KERBEROS (DIGEST is not supported) - c.write(KERBEROS_AUTH_TYPE); - } else { - c.write(DEFAULT_AUTH_TYPE); - } - return magic; -} - -std::unique_ptr<IOBuf> RpcSerde::Header(const std::string &user) { - pb::ConnectionHeader h; - - // TODO(eclark): Make this not a total lie. - h.mutable_user_info()->set_effective_user(user); - // The service name that we want to talk to. - // - // Right now we're completely ignoring the service interface. - // That may or may not be the correct thing to do. - // It worked for a while with the java client; until it - // didn't. - // TODO: send the service name and user from the RpcClient - h.set_service_name(INTERFACE); - - std::unique_ptr<pb::VersionInfo> version_info = CreateVersionInfo(); - - h.set_allocated_version_info(version_info.release()); - - if (codec_ != nullptr) { - h.set_cell_block_codec_class(codec_->java_class_name()); - } - return PrependLength(SerializeMessage(h)); -} - -std::unique_ptr<pb::VersionInfo> RpcSerde::CreateVersionInfo() { - std::unique_ptr<pb::VersionInfo> version_info = std::make_unique<pb::VersionInfo>(); - version_info->set_user(Version::user); - version_info->set_revision(Version::revision); - version_info->set_url(Version::url); - version_info->set_date(Version::date); - version_info->set_src_checksum(Version::src_checksum); - version_info->set_version(Version::version); - - std::string version{Version::version}; - std::vector<std::string> version_parts; - boost::split(version_parts, version, boost::is_any_of("."), boost::token_compress_on); - uint32_t major_version = 0, minor_version = 0; - if (version_parts.size() >= 2) { - version_info->set_version_major(folly::to<uint32_t>(version_parts[0])); - version_info->set_version_minor(folly::to<uint32_t>(version_parts[1])); - } - - VLOG(1) << "Client VersionInfo:" << version_info->ShortDebugString(); - return version_info; -} - -std::unique_ptr<IOBuf> RpcSerde::Request(const uint32_t call_id, const std::string &method, - const Message *msg) { - pb::RequestHeader rq; - rq.set_method_name(method); - rq.set_call_id(call_id); - rq.set_request_param(msg != nullptr); - auto ser_header = SerializeDelimited(rq); - if (msg != nullptr) { - auto ser_req = SerializeDelimited(*msg); - ser_header->appendChain(std::move(ser_req)); - } - - return PrependLength(std::move(ser_header)); -} - -std::unique_ptr<CellScanner> RpcSerde::CreateCellScanner(std::unique_ptr<folly::IOBuf> buf, - uint32_t offset, uint32_t length) { - if (codec_ == nullptr) { - return nullptr; - } - return codec_->CreateDecoder(std::move(buf), offset, length); -} - -std::unique_ptr<IOBuf> RpcSerde::PrependLength(std::unique_ptr<IOBuf> msg) { - // Java ints are 4 long. So create a buffer that large - auto len_buf = IOBuf::create(4); - // Then make those bytes visible. - len_buf->append(4); - - RWPrivateCursor c(len_buf.get()); - // Get the size of the data to be pushed out the network. - auto size = msg->computeChainDataLength(); - - // Write the length to this IOBuf. - c.writeBE(static_cast<uint32_t>(size)); - - // Then attach the origional to the back of len_buf - len_buf->appendChain(std::move(msg)); - return len_buf; -} - -std::unique_ptr<IOBuf> RpcSerde::SerializeDelimited(const Message &msg) { - // Get the buffer size needed for just the message. - int msg_size = msg.ByteSize(); - int buf_size = CodedOutputStream::VarintSize32(msg_size) + msg_size; - - // Create a buffer big enough to hold the varint and the object. - auto buf = IOBuf::create(buf_size); - buf->append(buf_size); - - // Create the array output stream. - ArrayOutputStream aos{buf->writableData(), static_cast<int>(buf->length())}; - // Wrap the ArrayOuputStream in the coded output stream to allow writing - // Varint32 - CodedOutputStream cos{&aos}; - - // Write out the size. - cos.WriteVarint32(msg_size); - - // Now write the rest out. - // We're using the protobuf output streams here to keep track - // of where in the output array we are rather than IOBuf. - msg.SerializeWithCachedSizesToArray(cos.GetDirectBufferForNBytesAndAdvance(msg_size)); - - // Return the buffer. - return buf; -} -// TODO(eclark): Make this 1 copy. -std::unique_ptr<IOBuf> RpcSerde::SerializeMessage(const Message &msg) { - auto buf = IOBuf::copyBuffer(msg.SerializeAsString()); - return buf; -} -} // namespace hbase http://git-wip-us.apache.org/repos/asf/hbase/blob/1193812d/hbase-native-client/serde/rpc.h ---------------------------------------------------------------------- diff --git a/hbase-native-client/serde/rpc.h b/hbase-native-client/serde/rpc.h deleted file mode 100644 index 15aa1ee..0000000 --- a/hbase-native-client/serde/rpc.h +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -#pragma once - -#include <memory> -#include <string> - -#include "if/HBase.pb.h" -#include "serde/cell-scanner.h" -#include "serde/codec.h" - -// Forward -namespace folly { -class IOBuf; -} -namespace google { -namespace protobuf { -class Message; -} -} - -namespace hbase { - -/** - * @brief Class for serializing a deserializing rpc formatted data. - * - * RpcSerde is the one stop shop for reading/writing data to HBase daemons. - * It should throw exceptions if anything goes wrong. - */ -class RpcSerde { - public: - /** - * Constructor assumes the default auth type. - */ - RpcSerde(std::shared_ptr<Codec> codec); - - /** - * Destructor. This is provided just for testing purposes. - */ - virtual ~RpcSerde() = default; - - /** - * Pase a message in the delimited format. - * - * A message in delimited format consists of the following: - * - * - a protobuf var int32. - * - A protobuf object serialized. - */ - int ParseDelimited(const folly::IOBuf *buf, google::protobuf::Message *msg); - - /** - * Create a new connection preamble in a new IOBuf. - */ - static std::unique_ptr<folly::IOBuf> Preamble(bool secure); - - /** - * Create the header protobuf object and serialize it to a new IOBuf. - * Header is in the following format: - * - * - Big endian length - * - ConnectionHeader object serialized out. - */ - std::unique_ptr<folly::IOBuf> Header(const std::string &user); - - /** - * Take ownership of the passed buffer, and create a CellScanner using the - * Codec class to parse Cells out of the wire. - */ - std::unique_ptr<CellScanner> CreateCellScanner(std::unique_ptr<folly::IOBuf> buf, uint32_t offset, - uint32_t length); - - /** - * Serialize a request message into a protobuf. - * Request consists of: - * - * - Big endian length - * - RequestHeader object - * - The passed in Message object - */ - std::unique_ptr<folly::IOBuf> Request(const uint32_t call_id, const std::string &method, - const google::protobuf::Message *msg); - - /** - * Serialize a message in the delimited format. - * Delimited format consists of the following: - * - * - A protobuf var int32 - * - The message object seriailized after that. - */ - std::unique_ptr<folly::IOBuf> SerializeDelimited(const google::protobuf::Message &msg); - - /** - * Serilalize a message. This does not add any length prepend. - */ - std::unique_ptr<folly::IOBuf> SerializeMessage(const google::protobuf::Message &msg); - - /** - * Prepend a length IOBuf to the given IOBuf chain. - * This involves no copies or moves of the passed in data. - */ - std::unique_ptr<folly::IOBuf> PrependLength(std::unique_ptr<folly::IOBuf> msg); - - private: - /* data */ - std::shared_ptr<Codec> codec_; - std::unique_ptr<pb::VersionInfo> CreateVersionInfo(); -}; -} // namespace hbase