IMPALA-4670: Introduces RpcMgr class This patch introduces a new class, RpcMgr which is the abstraction layer around KRPC core mechanics. It provides an interface RegisterService() for various services to register themselves.
Kudu RPC is invoked via an auto-generated interface called proxy. This change implements an inline wrapper for KRPC client to obtain a proxy for a particular service exported by remote server. Last but not least, the RpcMgr will start all registered services if FLAGS_use_krpc is true. This patch hasn't yet added any service except for some test services in rpc-mgr-test. This patch is based on an abandoned patch by Henry Robinson. Testing done: a new backend test is added to exercise the code and demonstrate the way to interact with KRPC framework. Change-Id: I8adb10ae375d7bf945394c38a520f12d29cf7b46 Reviewed-on: http://gerrit.cloudera.org:8080/7901 Reviewed-by: Michael Ho <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/dd4c6be8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/dd4c6be8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/dd4c6be8 Branch: refs/heads/master Commit: dd4c6be8e082d4dd42e099e9a03ed23fdae9c13b Parents: e6594bf Author: Michael Ho <[email protected]> Authored: Sun Aug 20 13:53:34 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Oct 6 07:09:55 2017 +0000 ---------------------------------------------------------------------- CMakeLists.txt | 3 +- be/src/exec/kudu-util.h | 13 ++ be/src/rpc/CMakeLists.txt | 12 ++ be/src/rpc/rpc-mgr-test.cc | 251 ++++++++++++++++++++++++++ be/src/rpc/rpc-mgr.cc | 115 ++++++++++++ be/src/rpc/rpc-mgr.h | 181 +++++++++++++++++++ be/src/rpc/rpc-mgr.inline.h | 45 +++++ be/src/runtime/exec-env.cc | 23 ++- be/src/runtime/exec-env.h | 22 ++- be/src/scheduling/scheduler-test-util.cc | 12 +- be/src/scheduling/scheduler.cc | 27 +-- be/src/scheduling/scheduler.h | 8 +- be/src/service/impala-server.cc | 16 +- be/src/util/counting-barrier.h | 3 + be/src/util/network-util.cc | 11 ++ be/src/util/network-util.h | 12 +- cmake_modules/FindKRPC.cmake | 4 +- common/protobuf/CMakeLists.txt | 31 ++++ common/protobuf/rpc_test.proto | 42 +++++ 19 files changed, 780 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/CMakeLists.txt b/CMakeLists.txt index d60487f..865f7b2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -347,12 +347,13 @@ add_subdirectory(common/function-registry) add_subdirectory(common/thrift) add_subdirectory(common/fbs) add_subdirectory(common/yarn-extras) +add_subdirectory(common/protobuf) add_subdirectory(be) add_subdirectory(fe) add_subdirectory(ext-data-source) # Build target for all generated files which most backend code depends on -add_custom_target(gen-deps ALL DEPENDS thrift-deps) +add_custom_target(gen-deps ALL DEPENDS thrift-deps proto-deps) add_custom_target(tarballs ALL DEPENDS shell_tarball) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/exec/kudu-util.h ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-util.h b/be/src/exec/kudu-util.h index 28c6b27..6113401 100644 --- a/be/src/exec/kudu-util.h +++ b/be/src/exec/kudu-util.h @@ -81,5 +81,18 @@ Status WriteKuduValue(int col, PrimitiveType type, const void* value, /// Takes a Kudu client DataType and returns the corresponding Impala ColumnType. ColumnType KuduDataTypeToColumnType(kudu::client::KuduColumnSchema::DataType type); +/// Utility function for creating an Impala Status object based on a kudu::Status object. +/// 'k_status' is the kudu::Status object. +/// 'prepend' is a string to be prepended to details of 'k_status' when creating the +/// Impala Status object. +/// Note that we don't translate the kudu::Status error code to Impala error code +/// so the returned status' type is always of TErrorCode::GENERAL. +inline Status FromKuduStatus( + const kudu::Status& k_status, const std::string prepend = "") { + if (LIKELY(k_status.ok())) return Status::OK(); + if (prepend.empty()) return Status(k_status.ToString()); + return Status(strings::Substitute("$0: $1", prepend, k_status.ToString())); +} + } /// namespace impala #endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt index d837f6c..95be43f 100644 --- a/be/src/rpc/CMakeLists.txt +++ b/be/src/rpc/CMakeLists.txt @@ -21,8 +21,12 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/rpc") # where to put generated binaries set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/rpc") +# Mark the protobuf files as generated +set_source_files_properties(${RPC_TEST_PROTO_SRCS} PROPERTIES GENERATED TRUE) + add_library(Rpc authentication.cc + rpc-mgr.cc rpc-trace.cc TAcceptQueueServer.cpp thrift-util.cc @@ -35,3 +39,11 @@ add_dependencies(Rpc gen-deps) ADD_BE_TEST(thrift-util-test) ADD_BE_TEST(thrift-server-test) ADD_BE_TEST(authentication-test) + +ADD_BE_TEST(rpc-mgr-test) +add_dependencies(rpc-mgr-test rpc_test_proto) +target_link_libraries(rpc-mgr-test rpc_test_proto) + +add_library(rpc_test_proto ${RPC_TEST_PROTO_SRCS}) +add_dependencies(rpc_test_proto rpc_test_proto_tgt krpc) +target_link_libraries(rpc_test_proto krpc) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/rpc-mgr-test.cc ---------------------------------------------------------------------- diff --git a/be/src/rpc/rpc-mgr-test.cc b/be/src/rpc/rpc-mgr-test.cc new file mode 100644 index 0000000..3eb0d92 --- /dev/null +++ b/be/src/rpc/rpc-mgr-test.cc @@ -0,0 +1,251 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "rpc/rpc-mgr.inline.h" + +#include "common/init.h" +#include "exec/kudu-util.h" +#include "kudu/rpc/rpc_context.h" +#include "kudu/rpc/rpc_controller.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/rpc_sidecar.h" +#include "kudu/util/monotime.h" +#include "kudu/util/status.h" +#include "testutil/gtest-util.h" +#include "util/counting-barrier.h" +#include "util/network-util.h" +#include "util/test-info.h" + +#include "gen-cpp/rpc_test.proxy.h" +#include "gen-cpp/rpc_test.service.h" + +#include "common/names.h" + +using kudu::rpc::ErrorStatusPB; +using kudu::rpc::ServiceIf; +using kudu::rpc::RpcController; +using kudu::rpc::RpcContext; +using kudu::rpc::RpcSidecar; +using kudu::MonoDelta; +using kudu::Slice; + +using namespace std; + +DECLARE_int32(num_reactor_threads); +DECLARE_int32(num_acceptor_threads); +DECLARE_string(hostname); + +namespace impala { + +static int32_t SERVICE_PORT = FindUnusedEphemeralPort(nullptr); + +#define PAYLOAD_SIZE (4096) + +class RpcMgrTest : public testing::Test { + protected: + TNetworkAddress krpc_address_; + RpcMgr rpc_mgr_; + + virtual void SetUp() { + IpAddr ip; + ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); + krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT); + ASSERT_OK(rpc_mgr_.Init()); + } + + virtual void TearDown() { + rpc_mgr_.Shutdown(); + } + + // Utility function to initialize the parameter for ScanMem RPC. + // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar + // to 'controller'. Also sets up 'request' with the random value and index of the + // sidecar. + void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) { + int32_t pattern = random(); + for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern; + int idx; + Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE); + controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx); + request->set_pattern(pattern); + request->set_sidecar_idx(idx); + } + + private: + int32_t payload_[PAYLOAD_SIZE]; +}; + +typedef std::function<void(RpcContext*)> ServiceCB; + +class PingServiceImpl : public PingServiceIf { + public: + // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler. + PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity, + const scoped_refptr<kudu::rpc::ResultTracker> tracker, + ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); }) + : PingServiceIf(entity, tracker), cb_(cb) {} + + virtual void Ping( + const PingRequestPB* request, PingResponsePB* response, RpcContext* context) { + response->set_int_response(42); + cb_(context); + } + + private: + ServiceCB cb_; +}; + +class ScanMemServiceImpl : public ScanMemServiceIf { + public: + ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity, + const scoped_refptr<kudu::rpc::ResultTracker> tracker) + : ScanMemServiceIf(entity, tracker) { + } + + // The request comes with an int 'pattern' and a payload of int array sent with + // sidecar. Scan the array to make sure every element matches 'pattern'. + virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response, + RpcContext* context) { + int32_t pattern = request->pattern(); + Slice payload; + ASSERT_OK( + FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), &payload))); + ASSERT_EQ(payload.size() % sizeof(int32_t), 0); + + const int32_t* v = reinterpret_cast<const int32_t*>(payload.data()); + for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) { + int32_t val = v[i]; + if (val != pattern) { + context->RespondFailure(kudu::Status::Corruption( + Substitute("Expecting $1; Found $2", pattern, val))); + return; + } + } + context->RespondSuccess(); + } +}; + +TEST_F(RpcMgrTest, MultipleServices) { + // Test that a service can be started, and will respond to requests. + unique_ptr<ServiceIf> ping_impl( + new PingServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker())); + ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(ping_impl))); + + // Test that a second service, that verifies the RPC payload is not corrupted, + // can be started. + unique_ptr<ServiceIf> scan_mem_impl( + new ScanMemServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker())); + ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl))); + + FLAGS_num_acceptor_threads = 2; + FLAGS_num_reactor_threads = 10; + ASSERT_OK(rpc_mgr_.StartServices(krpc_address_)); + + unique_ptr<PingServiceProxy> ping_proxy; + ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(krpc_address_, &ping_proxy)); + + unique_ptr<ScanMemServiceProxy> scan_mem_proxy; + ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, &scan_mem_proxy)); + + RpcController controller; + srand(0); + // Randomly invoke either services to make sure a RpcMgr can host multiple + // services at the same time. + for (int i = 0; i < 100; ++i) { + controller.Reset(); + if (random() % 2 == 0) { + PingRequestPB request; + PingResponsePB response; + kudu::Status status = ping_proxy->Ping(request, &response, &controller); + ASSERT_TRUE(status.ok()); + ASSERT_EQ(response.int_response(), 42); + } else { + ScanMemRequestPB request; + ScanMemResponsePB response; + SetupScanMemRequest(&request, &controller); + kudu::Status status = scan_mem_proxy->ScanMem(request, &response, &controller); + ASSERT_TRUE(status.ok()); + } + } +} + +TEST_F(RpcMgrTest, SlowCallback) { + + // Use a callback which is slow to respond. + auto slow_cb = [](RpcContext* ctx) { + SleepForMs(300); + ctx->RespondSuccess(); + }; + + // Test a service which is slow to respond and has a short queue. + // Set a timeout on the client side. Expect either a client timeout + // or the service queue filling up. + unique_ptr<ServiceIf> impl( + new PingServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker(), slow_cb)); + const int num_service_threads = 1; + const int queue_size = 3; + ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, move(impl))); + + FLAGS_num_acceptor_threads = 2; + FLAGS_num_reactor_threads = 10; + ASSERT_OK(rpc_mgr_.StartServices(krpc_address_)); + + unique_ptr<PingServiceProxy> proxy; + ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(krpc_address_, &proxy)); + + PingRequestPB request; + PingResponsePB response; + RpcController controller; + for (int i = 0; i < 100; ++i) { + controller.Reset(); + controller.set_timeout(MonoDelta::FromMilliseconds(50)); + kudu::Status status = proxy->Ping(request, &response, &controller); + ASSERT_TRUE(status.IsTimedOut() || RpcMgr::IsServerTooBusy(controller)); + } +} + +TEST_F(RpcMgrTest, AsyncCall) { + unique_ptr<ServiceIf> scan_mem_impl( + new ScanMemServiceImpl(rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker())); + ASSERT_OK(rpc_mgr_.RegisterService(10, 10, move(scan_mem_impl))); + + unique_ptr<ScanMemServiceProxy> scan_mem_proxy; + ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, &scan_mem_proxy)); + + FLAGS_num_acceptor_threads = 2; + FLAGS_num_reactor_threads = 10; + ASSERT_OK(rpc_mgr_.StartServices(krpc_address_)); + + RpcController controller; + srand(0); + for (int i = 0; i < 10; ++i) { + controller.Reset(); + ScanMemRequestPB request; + ScanMemResponsePB response; + SetupScanMemRequest(&request, &controller); + CountingBarrier barrier(1); + scan_mem_proxy->ScanMemAsync(request, &response, &controller, + [barrier_ptr = &barrier]() { barrier_ptr->Notify(); }); + // TODO: Inject random cancellation here. + barrier.Wait(); + ASSERT_TRUE(controller.status().ok()) << controller.status().ToString(); + } +} + +} // namespace impala + +IMPALA_TEST_MAIN(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/rpc-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/rpc/rpc-mgr.cc b/be/src/rpc/rpc-mgr.cc new file mode 100644 index 0000000..f6491be --- /dev/null +++ b/be/src/rpc/rpc-mgr.cc @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "rpc/rpc-mgr.h" + +#include "exec/kudu-util.h" +#include "kudu/rpc/acceptor_pool.h" +#include "kudu/rpc/rpc_controller.h" +#include "kudu/rpc/service_if.h" +#include "kudu/util/net/net_util.h" +#include "util/cpu-info.h" +#include "util/network-util.h" + +#include "common/names.h" + +using kudu::rpc::MessengerBuilder; +using kudu::rpc::Messenger; +using kudu::rpc::AcceptorPool; +using kudu::rpc::RpcController; +using kudu::rpc::ServiceIf; +using kudu::rpc::ServicePool; +using kudu::Sockaddr; +using kudu::HostPort; +using kudu::MetricEntity; + +DECLARE_string(hostname); + +DEFINE_int32(num_acceptor_threads, 2, + "Number of threads dedicated to accepting connection requests for RPC services"); +DEFINE_int32(num_reactor_threads, 0, + "Number of threads dedicated to managing network IO for RPC services. If left at " + "default value 0, it will be set to number of CPU cores."); + +namespace impala { + +Status RpcMgr::Init() { + MessengerBuilder bld("impala-server"); + const scoped_refptr<MetricEntity> entity( + METRIC_ENTITY_server.Instantiate(®istry_, "krpc-metrics")); + int num_reactor_threads = + FLAGS_num_reactor_threads > 0 ? FLAGS_num_reactor_threads : CpuInfo::num_cores(); + bld.set_num_reactors(num_reactor_threads).set_metric_entity(entity); + KUDU_RETURN_IF_ERROR(bld.Build(&messenger_), "Could not build messenger"); + return Status::OK(); +} + +Status RpcMgr::RegisterService(int32_t num_service_threads, int32_t service_queue_depth, + unique_ptr<ServiceIf> service_ptr) { + DCHECK(is_inited()) << "Must call Init() before RegisterService()"; + DCHECK(!services_started_) << "Cannot call RegisterService() after StartServices()"; + scoped_refptr<ServicePool> service_pool = + new ServicePool(gscoped_ptr<ServiceIf>(service_ptr.release()), + messenger_->metric_entity(), service_queue_depth); + // Start the thread pool first before registering the service in case the startup fails. + KUDU_RETURN_IF_ERROR( + service_pool->Init(num_service_threads), "Service pool failed to start"); + KUDU_RETURN_IF_ERROR( + messenger_->RegisterService(service_pool->service_name(), service_pool), + "Could not register service"); + service_pools_.push_back(service_pool); + + return Status::OK(); +} + +Status RpcMgr::StartServices(const TNetworkAddress& address) { + DCHECK(is_inited()) << "Must call Init() before StartServices()"; + DCHECK(!services_started_) << "May not call StartServices() twice"; + + // Convert 'address' to Kudu's Sockaddr + DCHECK(IsResolvedAddress(address)); + Sockaddr sockaddr; + RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr)); + + // Call the messenger to create an AcceptorPool for us. + shared_ptr<AcceptorPool> acceptor_pool; + KUDU_RETURN_IF_ERROR(messenger_->AddAcceptorPool(sockaddr, &acceptor_pool), + "Failed to add acceptor pool"); + KUDU_RETURN_IF_ERROR(acceptor_pool->Start(FLAGS_num_acceptor_threads), + "Acceptor pool failed to start"); + VLOG_QUERY << "Started " << FLAGS_num_acceptor_threads << " acceptor threads"; + services_started_ = true; + return Status::OK(); +} + +void RpcMgr::Shutdown() { + if (messenger_.get() == nullptr) return; + for (auto service_pool : service_pools_) service_pool->Shutdown(); + + messenger_->UnregisterAllServices(); + messenger_->Shutdown(); + service_pools_.clear(); +} + +bool RpcMgr::IsServerTooBusy(const RpcController& rpc_controller) { + const kudu::Status status = rpc_controller.status(); + const kudu::rpc::ErrorStatusPB* err = rpc_controller.error_response(); + return status.IsRemoteError() && err != nullptr && err->has_code() && + err->code() == kudu::rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY; +} + +} // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/rpc-mgr.h ---------------------------------------------------------------------- diff --git a/be/src/rpc/rpc-mgr.h b/be/src/rpc/rpc-mgr.h new file mode 100644 index 0000000..d414bb6 --- /dev/null +++ b/be/src/rpc/rpc-mgr.h @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RPC_RPC_MGR_H +#define IMPALA_RPC_RPC_MGR_H + +#include "common/status.h" +#include "kudu/rpc/messenger.h" +#include "kudu/rpc/result_tracker.h" +#include "kudu/rpc/service_pool.h" +#include "kudu/util/metrics.h" + +#include "gen-cpp/Types_types.h" + +namespace kudu { +namespace rpc { +class RpcController; +class ServiceIf; +} // rpc +} // kudu + +namespace impala { + +/// Singleton class which manages all KRPC services and proxies. +/// +/// SERVICES +/// -------- +/// +/// An RpcMgr manages 0 or more services: RPC interfaces that are a collection of remotely +/// accessible methods. A new service is registered by calling RegisterService(). All +/// services are served on the same port; the underlying RPC layer takes care of +/// de-multiplexing RPC calls to their respective endpoints. +/// +/// Services are made available to remote clients when RpcMgr::StartServices() is called; +/// before this method no service method will be called. +/// +/// Services may only be registered and started after RpcMgr::Init() is called. +/// +/// PROXIES +/// ------- +/// +/// A proxy is a client-side interface to a remote service. Remote methods exported by +/// that service may be called through a proxy as though they were local methods. +/// +/// A proxy can be obtained by calling GetProxy(). Proxies implement local methods which +/// call remote service methods, e.g. proxy->Foo(request, &response) will call the Foo() +/// service method on the service that 'proxy' points to. +/// +/// Proxies may only be created after RpcMgr::Init() is called. +/// +/// For example usage of proxies, please see rpc-mgr-test.cc +/// +/// LIFECYCLE +/// --------- +/// +/// RpcMgr resides inside the singleton ExecEnv class. +/// +/// Before any proxy or service interactions, RpcMgr::Init() must be called exactly once +/// to start the reactor threads that service network events. Services must be registered +/// with RpcMgr::RegisterService() before RpcMgr::StartServices() is called. When shutting +/// down, RpcMgr::Shutdown() must be called to ensure that all services are cleanly +/// terminated. RpcMgr::Init() and RpcMgr::Shutdown() are not thread safe. +/// +/// KRPC INTERNALS +/// -------------- +/// +/// Each service and proxy interacts with the network via a shared pool of 'reactor' +/// threads which respond to incoming and outgoing RPC events. The number of 'reactor' +/// threads are configurable via FLAGS_reactor_thread. By default, it's set to the number +/// of cpu cores. Incoming events are passed immediately to one of two thread pools: new +/// connections are handled by an 'acceptor' pool, and RPC request events are handled by +/// a per-service 'service' pool. The size of a 'service' pool is specified when calling +/// RegisterService(). +/// +/// All incoming RPC requests are placed into a per-service pool's fixed-size queue. +/// The service threads will dequeue from this queue and process the requests. If the +/// queue becomes full, the RPC will fail at the caller. The function IsServerTooBusy() +/// below will return true for this case. The size of the queue is specified when calling +/// RegisterService(). +/// +/// Inbound connection set-up is handled by a small fixed-size pool of 'acceptor' +/// threads. The number of threads that accept new TCP connection requests to the service +/// port is configurable via FLAGS_acceptor_threads. +class RpcMgr { + public: + /// Initializes the reactor threads, and prepares for sending outbound RPC requests. + Status Init() WARN_UNUSED_RESULT; + + bool is_inited() const { return messenger_.get() != nullptr; } + + /// Start the acceptor threads which listen on 'address', making KRPC services + /// available. 'address' has to be a resolved IP address. Before this method is called, + /// remote clients will get a 'connection refused' error when trying to invoke an RPC + /// on this host. + Status StartServices(const TNetworkAddress& address) WARN_UNUSED_RESULT; + + /// Register a new service. + /// + /// 'num_service_threads' is the number of threads that should be started to execute RPC + /// handlers for the new service. + /// + /// 'service_queue_depth' is the maximum number of requests that may be queued for this + /// service before clients begin to see rejection errors. + /// + /// 'service_ptr' contains an interface implementation that will handle RPCs. Note that + /// the service name has to be unique within an Impala instance or the registration will + /// fail. + /// + /// It is an error to call this after StartServices() has been called. + Status RegisterService(int32_t num_service_threads, int32_t service_queue_depth, + std::unique_ptr<kudu::rpc::ServiceIf> service_ptr) WARN_UNUSED_RESULT; + + /// Creates a new proxy for a remote service of type P at location 'address', and places + /// it in 'proxy'. 'P' must descend from kudu::rpc::ServiceIf. Note that 'address' must + /// be a resolved IP address. + template <typename P> + Status GetProxy(const TNetworkAddress& address, std::unique_ptr<P>* proxy) + WARN_UNUSED_RESULT; + + /// Shut down all previously registered services. All service pools are shut down. + /// All acceptor and reactor threads within the messenger are also shut down. + /// All unprocessed incoming requests will be replied with error messages. + void Shutdown(); + + /// Returns true if the last RPC of 'rpc_controller' failed because the remote + /// service's queue filled up and couldn't accept more incoming requests. + /// 'rpc_controller' should contain the status of the last RPC call. + static bool IsServerTooBusy(const kudu::rpc::RpcController& rpc_controller); + + const scoped_refptr<kudu::rpc::ResultTracker> result_tracker() const { + return tracker_; + } + + scoped_refptr<kudu::MetricEntity> metric_entity() const { + return messenger_->metric_entity(); + } + + ~RpcMgr() { + DCHECK_EQ(service_pools_.size(), 0) + << "Must call Shutdown() before destroying RpcMgr"; + } + + private: + /// One pool per registered service. scoped_refptr<> is dictated by the Kudu interface. + std::vector<scoped_refptr<kudu::rpc::ServicePool>> service_pools_; + + /// Required Kudu boilerplate for constructing the MetricEntity passed + /// to c'tor of ServiceIf when creating a service. + /// TODO(KRPC): Integrate with Impala MetricGroup. + kudu::MetricRegistry registry_; + + /// Used when creating a new service. Shared across all services which don't really + /// track results for idempotent RPC calls. + const scoped_refptr<kudu::rpc::ResultTracker> tracker_; + + /// Container for reactor threads which run event loops for RPC services, plus acceptor + /// threads which manage connection setup. Has to be a shared_ptr as required by + /// MessangerBuilder::Build(). + std::shared_ptr<kudu::rpc::Messenger> messenger_; + + /// True after StartServices() completes. + bool services_started_ = false; +}; + +} // namespace impala + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/rpc/rpc-mgr.inline.h ---------------------------------------------------------------------- diff --git a/be/src/rpc/rpc-mgr.inline.h b/be/src/rpc/rpc-mgr.inline.h new file mode 100644 index 0000000..474ac45 --- /dev/null +++ b/be/src/rpc/rpc-mgr.inline.h @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RPC_RPC_MGR_INLINE_H +#define IMPALA_RPC_RPC_MGR_INLINE_H + +#include "rpc/rpc-mgr.h" + +#include "exec/kudu-util.h" +#include "kudu/rpc/messenger.h" +#include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/service_pool.h" +#include "util/network-util.h" + +namespace impala { + +/// Always inline to avoid having to provide a definition for each use type P. +template <typename P> +Status RpcMgr::GetProxy(const TNetworkAddress& address, std::unique_ptr<P>* proxy) { + DCHECK(proxy != nullptr); + DCHECK(is_inited()) << "Must call Init() before GetProxy()"; + DCHECK(IsResolvedAddress(address)); + kudu::Sockaddr sockaddr; + RETURN_IF_ERROR(TNetworkAddressToSockaddr(address, &sockaddr)); + proxy->reset(new P(messenger_, sockaddr, address.hostname)); + return Status::OK(); +} + +} // namespace impala + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 8942007..94d2ca6 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -28,6 +28,7 @@ #include "common/object-pool.h" #include "exec/kudu-util.h" #include "gen-cpp/ImpalaInternalService.h" +#include "rpc/rpc-mgr.h" #include "runtime/backend-client.h" #include "runtime/bufferpool/buffer-pool.h" #include "runtime/bufferpool/reservation-tracker.h" @@ -168,10 +169,12 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port, async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)), query_exec_mgr_(new QueryExecMgr()), enable_webserver_(FLAGS_enable_webserver && webserver_port > 0), - backend_address_(MakeNetworkAddress(hostname, backend_port)), - krpc_port_(krpc_port) { + backend_address_(MakeNetworkAddress(hostname, backend_port)) { if (FLAGS_use_krpc) { + // KRPC relies on resolved IP address. It's set in StartServices(). + krpc_address_.__set_port(krpc_port); + rpc_mgr_.reset(new RpcMgr()); stream_mgr_.reset(new KrpcDataStreamMgr(metrics_.get())); } else { stream_mgr_.reset(new DataStreamMgr(metrics_.get())); @@ -204,6 +207,7 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int krpc_port, ExecEnv::~ExecEnv() { if (buffer_reservation_ != nullptr) buffer_reservation_->Close(); + if (rpc_mgr_ != nullptr) rpc_mgr_->Shutdown(); disk_io_mgr_.reset(); // Need to tear down before mem_tracker_. } @@ -281,6 +285,15 @@ Status ExecEnv::Init() { RETURN_IF_ERROR(RegisterMemoryMetrics( metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get())); + // Resolve hostname to IP address. + RETURN_IF_ERROR(HostnameToIpAddr(backend_address_.hostname, &ip_address_)); + + // Initialize the RPCMgr before allowing services registration. + if (FLAGS_use_krpc) { + krpc_address_.__set_hostname(ip_address_); + RETURN_IF_ERROR(rpc_mgr_->Init()); + } + mem_tracker_.reset( new MemTracker(AggregateMemoryMetrics::TOTAL_USED, bytes_limit, "Process")); // Add BufferPool MemTrackers for cached memory that is not tracked against queries @@ -334,7 +347,7 @@ Status ExecEnv::Init() { } if (scheduler_ != nullptr) { - RETURN_IF_ERROR(scheduler_->Init(backend_address_, krpc_port_)); + RETURN_IF_ERROR(scheduler_->Init(backend_address_, krpc_address_, ip_address_)); } if (admission_controller_ != nullptr) RETURN_IF_ERROR(admission_controller_->Init()); @@ -364,6 +377,8 @@ Status ExecEnv::StartServices() { } } + // Start this last so everything is in place before accepting the first call. + if (FLAGS_use_krpc) RETURN_IF_ERROR(rpc_mgr_->StartServices(krpc_address_)); return Status::OK(); } @@ -402,4 +417,4 @@ KrpcDataStreamMgr* ExecEnv::KrpcStreamMgr() { return dynamic_cast<KrpcDataStreamMgr*>(stream_mgr_.get()); } -} +} // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/runtime/exec-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index b8a271d..31532df 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -27,9 +27,14 @@ #include "common/status.h" #include "runtime/client-cache-types.h" #include "util/hdfs-bulk-ops-defs.h" // For declaration of HdfsOpThreadPool +#include "util/network-util.h" #include "util/spinlock.h" -namespace kudu { namespace client { class KuduClient; } } +namespace kudu { +namespace client { +class KuduClient; +} // namespace client +} // namespace kudu namespace impala { @@ -53,6 +58,7 @@ class ObjectPool; class QueryResourceMgr; class RequestPoolService; class ReservationTracker; +class RpcMgr; class Scheduler; class StatestoreSubscriber; class ThreadResourceMgr; @@ -131,7 +137,9 @@ class ExecEnv { const TNetworkAddress& backend_address() const { return backend_address_; } - int krpc_port() const { return krpc_port_; } + const IpAddr& ip_address() const { return ip_address_; } + + const TNetworkAddress& krpc_address() const { return krpc_address_; } /// Initializes the exec env for running FE tests. Status InitForFeTests() WARN_UNUSED_RESULT; @@ -173,6 +181,7 @@ class ExecEnv { boost::scoped_ptr<CallableThreadPool> exec_rpc_thread_pool_; boost::scoped_ptr<CallableThreadPool> async_rpc_pool_; boost::scoped_ptr<QueryExecMgr> query_exec_mgr_; + boost::scoped_ptr<RpcMgr> rpc_mgr_; /// Query-wide buffer pool and the root reservation tracker for the pool. The /// reservation limit is equal to the maximum capacity of the pool. Created in @@ -191,11 +200,14 @@ class ExecEnv { static ExecEnv* exec_env_; bool is_fe_tests_ = false; - /// Address of the Impala backend server instance + /// Address of the thrift based ImpalaInternalService TNetworkAddress backend_address_; - /// Port number on which all KRPC-based services are exported. - int krpc_port_; + /// Resolved IP address of the host name. + IpAddr ip_address_; + + /// Address of the KRPC-based ImpalaInternalService + TNetworkAddress krpc_address_; /// fs.defaultFs value set in core-site.xml std::string default_fs_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/scheduling/scheduler-test-util.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc index 2d85a9a..05cfc42 100644 --- a/be/src/scheduling/scheduler-test-util.cc +++ b/be/src/scheduling/scheduler-test-util.cc @@ -20,7 +20,6 @@ #include <boost/unordered_set.hpp> #include "common/names.h" -#include "runtime/exec-env.h" #include "scheduling/scheduler.h" using namespace impala; @@ -509,13 +508,14 @@ void SchedulerWrapper::InitializeScheduler() { << "hosts."; const Host& scheduler_host = plan_.cluster().hosts()[0]; string scheduler_backend_id = scheduler_host.ip; - TNetworkAddress scheduler_backend_address; - scheduler_backend_address.hostname = scheduler_host.ip; - scheduler_backend_address.port = scheduler_host.be_port; - + TNetworkAddress scheduler_backend_address = + MakeNetworkAddress(scheduler_host.ip, scheduler_host.be_port); + TNetworkAddress scheduler_krpc_address = + MakeNetworkAddress(scheduler_host.ip, FLAGS_krpc_port); scheduler_.reset(new Scheduler(nullptr, scheduler_backend_id, &metrics_, nullptr, nullptr)); - const Status status = scheduler_->Init(scheduler_backend_address, FLAGS_krpc_port); + const Status status = scheduler_->Init(scheduler_backend_address, + scheduler_krpc_address, scheduler_host.ip); DCHECK(status.ok()) << "Scheduler init failed in test"; // Initialize the scheduler backend maps. SendFullMembershipMap(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/scheduling/scheduler.cc ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index adac41f..5cf0f01 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -65,29 +65,20 @@ Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id, request_pool_service_(request_pool_service) { } -Status Scheduler::Init(const TNetworkAddress& backend_address, int krpc_port) { +Status Scheduler::Init(const TNetworkAddress& backend_address, + const TNetworkAddress& krpc_address, const IpAddr& ip) { LOG(INFO) << "Starting scheduler"; - - // Figure out what our IP address is, so that each subscriber doesn't have to resolve - // it on every heartbeat. KRPC also assumes that the address is resolved already. - // May as well do it up front to avoid frequent DNS requests. local_backend_descriptor_.address = backend_address; - IpAddr ip; - const Hostname& hostname = backend_address.hostname; - Status status = HostnameToIpAddr(hostname, &ip); - if (!status.ok()) { - VLOG(1) << status.GetDetail(); - status.AddDetail("Scheduler failed to start"); - return status; - } - + // Store our IP address so that each subscriber doesn't have to resolve + // it on every heartbeat. May as well do it up front to avoid frequent DNS + // requests. local_backend_descriptor_.ip_address = ip; LOG(INFO) << "Scheduler using " << ip << " as IP address"; - if (FLAGS_use_krpc) { - // KRPC expects address to have been resolved already. - TNetworkAddress krpc_svc_addr = MakeNetworkAddress(ip, krpc_port); - local_backend_descriptor_.__set_krpc_address(krpc_svc_addr); + // KRPC relies on resolved IP address. + DCHECK(IsResolvedAddress(krpc_address)); + DCHECK_EQ(krpc_address.hostname, ip); + local_backend_descriptor_.__set_krpc_address(krpc_address); } coord_only_backend_config_.AddBackend(local_backend_descriptor_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/scheduling/scheduler.h ---------------------------------------------------------------------- diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h index d1a22f8..2fe90b8 100644 --- a/be/src/scheduling/scheduler.h +++ b/be/src/scheduling/scheduler.h @@ -86,9 +86,11 @@ class Scheduler { /// decisions once this method returns. Register with the subscription manager if /// required. Also initializes the local backend descriptor. Returns error status /// on failure. 'backend_address' is the address of thrift based ImpalaInternalService - /// of this backend. 'krpc_port' is the port on which KRPC based ImpalaInternalService - /// is exported. - Status Init(const TNetworkAddress& backend_address, int krpc_port); + /// of this backend. If FLAGS_use_krpc is true, 'krpc_address' contains IP-address:port + /// on which KRPC based ImpalaInternalService is exported. 'ip' is the resolved + /// IP address of this backend. + Status Init(const TNetworkAddress& backend_address, + const TNetworkAddress& krpc_address, const IpAddr& ip); /// Populates given query schedule and assigns fragments to hosts based on scan /// ranges in the query exec request. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/service/impala-server.cc ---------------------------------------------------------------------- diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc index d9d2629..fabc8fa 100644 --- a/be/src/service/impala-server.cc +++ b/be/src/service/impala-server.cc @@ -1629,18 +1629,10 @@ void ImpalaServer::AddLocalBackendToStatestore( local_backend_descriptor.__set_is_coordinator(FLAGS_is_coordinator); local_backend_descriptor.__set_is_executor(FLAGS_is_executor); local_backend_descriptor.__set_address(exec_env_->backend_address()); - IpAddr ip; - const Hostname& hostname = local_backend_descriptor.address.hostname; - Status status = HostnameToIpAddr(hostname, &ip); - if (!status.ok()) { - // TODO: Should we do something about this failure? - LOG(WARNING) << "Failed to convert hostname " << hostname << " to IP address: " - << status.GetDetail(); - return; - } - local_backend_descriptor.ip_address = ip; + local_backend_descriptor.ip_address = exec_env_->ip_address(); if (FLAGS_use_krpc) { - TNetworkAddress krpc_address = MakeNetworkAddress(ip, exec_env_->krpc_port()); + const TNetworkAddress& krpc_address = exec_env_->krpc_address(); + DCHECK(IsResolvedAddress(krpc_address)); local_backend_descriptor.__set_krpc_address(krpc_address); } subscriber_topic_updates->emplace_back(TTopicDelta()); @@ -1650,7 +1642,7 @@ void ImpalaServer::AddLocalBackendToStatestore( TTopicItem& item = update.topic_entries.back(); item.key = local_backend_id; - status = thrift_serializer_.Serialize(&local_backend_descriptor, &item.value); + Status status = thrift_serializer_.Serialize(&local_backend_descriptor, &item.value); if (!status.ok()) { LOG(WARNING) << "Failed to serialize Impala backend descriptor for statestore topic:" << " " << status.GetDetail(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/util/counting-barrier.h ---------------------------------------------------------------------- diff --git a/be/src/util/counting-barrier.h b/be/src/util/counting-barrier.h index 76dbe35..49b0bde 100644 --- a/be/src/util/counting-barrier.h +++ b/be/src/util/counting-barrier.h @@ -18,6 +18,9 @@ #ifndef IMPALA_UTIL_COUNTING_BARRIER_H #define IMPALA_UTIL_COUNTING_BARRIER_H +#include "common/atomic.h" +#include "util/promise.h" + namespace impala { /// Allows clients to wait for the arrival of a fixed number of notifications before they http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/util/network-util.cc ---------------------------------------------------------------------- diff --git a/be/src/util/network-util.cc b/be/src/util/network-util.cc index 1a9ce53..7a10965 100644 --- a/be/src/util/network-util.cc +++ b/be/src/util/network-util.cc @@ -28,6 +28,7 @@ #include <vector> #include <boost/algorithm/string.hpp> +#include "exec/kudu-util.h" #include "kudu/util/net/sockaddr.h" #include "util/debug-util.h" #include "util/error-util.h" @@ -212,4 +213,14 @@ int FindUnusedEphemeralPort(vector<int>* used_ports) { close(sockfd); return -1; } + +Status TNetworkAddressToSockaddr(const TNetworkAddress& address, + kudu::Sockaddr* sockaddr) { + DCHECK(IsResolvedAddress(address)); + KUDU_RETURN_IF_ERROR( + sockaddr->ParseString(TNetworkAddressToString(address), address.port), + "Failed to parse address to Kudu Sockaddr."); + return Status::OK(); +} + } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/be/src/util/network-util.h ---------------------------------------------------------------------- diff --git a/be/src/util/network-util.h b/be/src/util/network-util.h index e964f5c..5b108dc 100644 --- a/be/src/util/network-util.h +++ b/be/src/util/network-util.h @@ -20,6 +20,10 @@ #include "gen-cpp/Types_types.h" #include <vector> +namespace kudu { +class Sockaddr; +} // namespace kudu + namespace impala { /// Type to store hostnames, which can be rfc1123 hostnames or IPv4 addresses. @@ -64,6 +68,11 @@ bool IsWildcardAddress(const std::string& ipaddress); /// Utility method to print address as address:port std::string TNetworkAddressToString(const TNetworkAddress& address); +/// Utility method to convert TNetworkAddress to Kudu sock addr. +/// Note that 'address' has to contain a resolved IP address. +Status TNetworkAddressToSockaddr(const TNetworkAddress& address, + kudu::Sockaddr* sockaddr); + /// Prints a hostport as ipaddress:port std::ostream& operator<<(std::ostream& out, const TNetworkAddress& hostport); @@ -71,4 +80,5 @@ std::ostream& operator<<(std::ostream& out, const TNetworkAddress& hostport); /// a free ephemeral port can't be found after 100 tries. If 'used_ports' is non-NULL, /// does not select those ports and adds the selected port to 'used_ports'. int FindUnusedEphemeralPort(std::vector<int>* used_ports); -} + +} // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/cmake_modules/FindKRPC.cmake ---------------------------------------------------------------------- diff --git a/cmake_modules/FindKRPC.cmake b/cmake_modules/FindKRPC.cmake index 593edcd..1547c20 100644 --- a/cmake_modules/FindKRPC.cmake +++ b/cmake_modules/FindKRPC.cmake @@ -101,7 +101,9 @@ function(KRPC_GENERATE SRCS HDRS TGTS) # This custom target enforces that there's just one invocation of protoc # when there are multiple consumers of the generated files. The target name # must be unique; adding parts of the filename helps ensure this. - set(TGT_NAME ${REL_DIR}${FIL}) + # Adding the prefix "KRPC_" to avoid conflation with the input proto file + # when ninja is used. Otherwise, ninja will flag a false circular dependency. + set(TGT_NAME KRPC_${REL_DIR}${FIL}) string(REPLACE "/" "-" TGT_NAME ${TGT_NAME}) add_custom_target(${TGT_NAME} DEPENDS "${SERVICE_CC}" "${SERVICE_H}" http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/common/protobuf/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/common/protobuf/CMakeLists.txt b/common/protobuf/CMakeLists.txt new file mode 100644 index 0000000..4d5f121 --- /dev/null +++ b/common/protobuf/CMakeLists.txt @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +cmake_minimum_required(VERSION 2.6) + +set(PROTOBUF_OUTPUT_DIR ${CMAKE_SOURCE_DIR}/be/generated-sources/gen-cpp/) + +KRPC_GENERATE(RPC_TEST_PROTO_SRCS RPC_TEST_PROTO_HDRS + RPC_TEST_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR} + BINARY_ROOT ${PROTOBUF_OUTPUT_DIR} + PROTO_FILES rpc_test.proto) +add_custom_target(rpc_test_proto_tgt DEPENDS ${RPC_TEST_PROTO_TGTS}) +set(RPC_TEST_PROTO_SRCS ${RPC_TEST_PROTO_SRCS} PARENT_SCOPE) + +add_custom_target(proto-deps ALL DEPENDS token_proto rpc_header_proto) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/dd4c6be8/common/protobuf/rpc_test.proto ---------------------------------------------------------------------- diff --git a/common/protobuf/rpc_test.proto b/common/protobuf/rpc_test.proto new file mode 100644 index 0000000..fd22331 --- /dev/null +++ b/common/protobuf/rpc_test.proto @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package impala; + +// Definitions for service used for rpc-mgr-test. +message PingRequestPB { +} + +message PingResponsePB { + required int32 int_response = 1; +} + +service PingService { + rpc Ping(PingRequestPB) returns (PingResponsePB); +} + +message ScanMemRequestPB { + required int32 pattern = 1; + required int32 sidecar_idx = 2; +} + +message ScanMemResponsePB { +} + +service ScanMemService { + rpc ScanMem(ScanMemRequestPB) returns (ScanMemResponsePB); +}
