This is an automated email from the ASF dual-hosted git repository. todd pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit a4b28812f43e6b8a85e1d61b48a2dbba70d56d1b Author: Todd Lipcon <[email protected]> AuthorDate: Tue Mar 24 10:42:14 2020 -0700 rpc: relax limit on number of sidecars Following the example of 0a7787f6563ae316a4a6207a9244c19ba8a7731f this changes the the sidecars for outbound calls to use a boost::container::small_vector instead of a fixed size array. This means we no longer need such a strict limit. This commit raises the limit to 10000 as a reasonable "can't see the need for more than this" number. This also fixes an issue where the underlying socket syscalls fail if the number of slices in the iovec list is more than IOV_MAX. The new test verifies the behavior. Change-Id: I8e53d5b50753e4aa57d885718ad5b24558636f82 Reviewed-on: http://gerrit.cloudera.org:8080/15547 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <[email protected]> --- src/kudu/rpc/inbound_call.cc | 3 +- src/kudu/rpc/inbound_call.h | 4 +-- src/kudu/rpc/outbound_call.cc | 3 +- src/kudu/rpc/outbound_call.h | 4 +-- src/kudu/rpc/rpc-test-base.h | 75 +++++++++++++++++++++---------------------- src/kudu/rpc/rpc-test.cc | 45 ++++++++++++++++++++------ src/kudu/rpc/rpc_sidecar.cc | 13 +++++--- src/kudu/rpc/rpc_sidecar.h | 10 +++--- src/kudu/rpc/rtest.proto | 17 +++++----- src/kudu/rpc/transfer.cc | 2 +- src/kudu/rpc/transfer.h | 2 +- 11 files changed, 104 insertions(+), 74 deletions(-) diff --git a/src/kudu/rpc/inbound_call.cc b/src/kudu/rpc/inbound_call.cc index 60fdbb2..179a1d3 100644 --- a/src/kudu/rpc/inbound_call.cc +++ b/src/kudu/rpc/inbound_call.cc @@ -21,6 +21,7 @@ #include <memory> #include <ostream> +#include <boost/container/vector.hpp> #include <glog/logging.h> #include <google/protobuf/message.h> #include <google/protobuf/message_lite.h> @@ -93,7 +94,7 @@ Status InboundCall::ParseFrom(unique_ptr<InboundTransfer> transfer) { } RETURN_NOT_OK(RpcSidecar::ParseSidecars( - header_.sidecar_offsets(), serialized_request_, inbound_sidecar_slices_)); + header_.sidecar_offsets(), serialized_request_, &inbound_sidecar_slices_)); if (header_.sidecar_offsets_size() > 0) { // Trim the request to just the message serialized_request_ = Slice(serialized_request_.data(), header_.sidecar_offsets(0)); diff --git a/src/kudu/rpc/inbound_call.h b/src/kudu/rpc/inbound_call.h index 1a9d8fb..c651aaa 100644 --- a/src/kudu/rpc/inbound_call.h +++ b/src/kudu/rpc/inbound_call.h @@ -30,6 +30,7 @@ #include "kudu/gutil/ref_counted.h" #include "kudu/rpc/remote_method.h" #include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/rpc_sidecar.h" #include "kudu/rpc/service_if.h" #include "kudu/rpc/transfer.h" #include "kudu/util/faststring.h" @@ -55,7 +56,6 @@ class Connection; class DumpConnectionsRequestPB; class RemoteUser; class RpcCallInProgressPB; -class RpcSidecar; struct InboundCallTiming { MonoTime time_received; // Time the call was first accepted. @@ -264,7 +264,7 @@ class InboundCall { // Inbound sidecars from the request. The slices are views onto transfer_. There are as // many slices as header_.sidecar_offsets_size(). - Slice inbound_sidecar_slices_[TransferLimits::kMaxSidecars]; + SidecarSliceVector inbound_sidecar_slices_; // The trace buffer. scoped_refptr<Trace> trace_; diff --git a/src/kudu/rpc/outbound_call.cc b/src/kudu/rpc/outbound_call.cc index 5a0c1da..66557b9 100644 --- a/src/kudu/rpc/outbound_call.cc +++ b/src/kudu/rpc/outbound_call.cc @@ -26,6 +26,7 @@ #include <utility> #include <vector> +#include <boost/container/vector.hpp> #include <boost/function.hpp> #include <gflags/gflags.h> #include <google/protobuf/message.h> @@ -512,7 +513,7 @@ Status CallResponse::ParseFrom(unique_ptr<InboundTransfer> transfer) { // Use information from header to extract the payload slices. RETURN_NOT_OK(RpcSidecar::ParseSidecars(header_.sidecar_offsets(), - serialized_response_, sidecar_slices_)); + serialized_response_, &sidecar_slices_)); if (header_.sidecar_offsets_size() > 0) { serialized_response_ = diff --git a/src/kudu/rpc/outbound_call.h b/src/kudu/rpc/outbound_call.h index eb36608..74db5ca 100644 --- a/src/kudu/rpc/outbound_call.h +++ b/src/kudu/rpc/outbound_call.h @@ -33,6 +33,7 @@ #include "kudu/rpc/remote_method.h" #include "kudu/rpc/response_callback.h" #include "kudu/rpc/rpc_header.pb.h" +#include "kudu/rpc/rpc_sidecar.h" #include "kudu/rpc/transfer.h" #include "kudu/util/faststring.h" #include "kudu/util/locks.h" @@ -55,7 +56,6 @@ class CallResponse; class DumpConnectionsRequestPB; class RpcCallInProgressPB; class RpcController; -class RpcSidecar; // Tracks the status of a call on the client side. // @@ -329,7 +329,7 @@ class CallResponse { Slice serialized_response_; // Slices of data for rpc sidecars. They point into memory owned by transfer_. - Slice sidecar_slices_[TransferLimits::kMaxSidecars]; + SidecarSliceVector sidecar_slices_; // The incoming transfer data - retained because serialized_response_ // and sidecar_slices_ refer into its data. diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h index 59bfd41..c7c33f6 100644 --- a/src/kudu/rpc/rpc-test-base.h +++ b/src/kudu/rpc/rpc-test-base.h @@ -39,6 +39,7 @@ #include "kudu/rpc/service_if.h" #include "kudu/rpc/service_pool.h" #include "kudu/security/security-test-util.h" +#include "kudu/util/crc.h" #include "kudu/util/env.h" #include "kudu/util/faststring.h" #include "kudu/util/mem_tracker.h" @@ -70,8 +71,8 @@ using kudu::rpc_test::ExactlyOnceResponsePB; using kudu::rpc_test::FeatureFlags; using kudu::rpc_test::PanicRequestPB; using kudu::rpc_test::PanicResponsePB; -using kudu::rpc_test::PushTwoStringsRequestPB; -using kudu::rpc_test::PushTwoStringsResponsePB; +using kudu::rpc_test::PushStringsRequestPB; +using kudu::rpc_test::PushStringsResponsePB; using kudu::rpc_test::SendTwoStringsRequestPB; using kudu::rpc_test::SendTwoStringsResponsePB; using kudu::rpc_test::SleepRequestPB; @@ -93,7 +94,7 @@ class GenericCalculatorService : public ServiceIf { static const char *kAddMethodName; static const char *kSleepMethodName; static const char *kSleepWithSidecarMethodName; - static const char *kPushTwoStringsMethodName; + static const char *kPushStringsMethodName; static const char *kSendTwoStringsMethodName; static const char *kAddExactlyOnce; @@ -118,8 +119,8 @@ class GenericCalculatorService : public ServiceIf { DoSleepWithSidecar(incoming); } else if (incoming->remote_method().method_name() == kSendTwoStringsMethodName) { DoSendTwoStrings(incoming); - } else if (incoming->remote_method().method_name() == kPushTwoStringsMethodName) { - DoPushTwoStrings(incoming); + } else if (incoming->remote_method().method_name() == kPushStringsMethodName) { + DoPushStrings(incoming); } else { incoming->RespondFailure(ErrorStatusPB::ERROR_NO_SUCH_METHOD, Status::InvalidArgument("bad method")); @@ -179,28 +180,25 @@ class GenericCalculatorService : public ServiceIf { incoming->RespondSuccess(resp); } - void DoPushTwoStrings(InboundCall* incoming) { + static void DoPushStrings(InboundCall* incoming) { Slice param(incoming->serialized_request()); - PushTwoStringsRequestPB req; + PushStringsRequestPB req; if (!req.ParseFromArray(param.data(), param.size())) { LOG(FATAL) << "couldn't parse: " << param.ToDebugString(); } - Slice sidecar1; - CHECK_OK(incoming->GetInboundSidecar(req.sidecar1_idx(), &sidecar1)); - - Slice sidecar2; - CHECK_OK(incoming->GetInboundSidecar(req.sidecar2_idx(), &sidecar2)); - // Check that reading non-existant sidecars doesn't work. Slice tmp; - CHECK(!incoming->GetInboundSidecar(req.sidecar2_idx() + 2, &tmp).ok()); + CHECK(!incoming->GetInboundSidecar(req.sidecar_indexes_size() + 2, &tmp).ok()); + + PushStringsResponsePB resp; + for (const auto& sidecar_idx : req.sidecar_indexes()) { + Slice sidecar; + CHECK_OK(incoming->GetInboundSidecar(sidecar_idx, &sidecar)); - PushTwoStringsResponsePB resp; - resp.set_size1(sidecar1.size()); - resp.set_data1(reinterpret_cast<const char*>(sidecar1.data()), sidecar1.size()); - resp.set_size2(sidecar2.size()); - resp.set_data2(reinterpret_cast<const char*>(sidecar2.data()), sidecar2.size()); + resp.add_sizes(sidecar.size()); + resp.add_crcs(crc::Crc32c(sidecar.data(), sidecar.size())); + } // Drop the sidecars etc, just to confirm that it's safe to do so. CHECK_GT(incoming->GetTransferSize(), 0); @@ -405,7 +403,7 @@ const char *GenericCalculatorService::kFullServiceName = "kudu.rpc.GenericCalcul const char *GenericCalculatorService::kAddMethodName = "Add"; const char *GenericCalculatorService::kSleepMethodName = "Sleep"; const char *GenericCalculatorService::kSleepWithSidecarMethodName = "SleepWithSidecar"; -const char *GenericCalculatorService::kPushTwoStringsMethodName = "PushTwoStrings"; +const char *GenericCalculatorService::kPushStringsMethodName = "PushStrings"; const char *GenericCalculatorService::kSendTwoStringsMethodName = "SendTwoStrings"; const char *GenericCalculatorService::kAddExactlyOnce = "AddExactlyOnce"; @@ -511,28 +509,29 @@ class RpcTestBase : public KuduTest { CHECK_EQ(0, second.compare(Slice(expected))); } - Status DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) { - PushTwoStringsRequestPB request; - RpcController controller; - - int idx1; - std::string s1(size1, 'a'); - CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s1)), &idx1)); + static Status DoTestOutgoingSidecar(const Proxy &p, int size1, int size2) { + return DoTestOutgoingSidecar(p, {std::string(size1, 'a'), std::string(size2, 'b')}); + } - int idx2; - std::string s2(size2, 'b'); - CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s2)), &idx2)); + static Status DoTestOutgoingSidecar(const Proxy& p, const std::vector<std::string>& strings) { + PushStringsRequestPB request; + RpcController controller; - request.set_sidecar1_idx(idx1); - request.set_sidecar2_idx(idx2); + for (const auto& s : strings) { + int idx; + CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(s)), &idx)); + request.add_sidecar_indexes(idx); + } - PushTwoStringsResponsePB resp; - KUDU_RETURN_NOT_OK(p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName, + PushStringsResponsePB resp; + KUDU_RETURN_NOT_OK(p.SyncRequest(GenericCalculatorService::kPushStringsMethodName, request, &resp, &controller)); - CHECK_EQ(size1, resp.size1()); - CHECK_EQ(resp.data1(), s1); - CHECK_EQ(size2, resp.size2()); - CHECK_EQ(resp.data2(), s2); + for (int i = 0; i < strings.size(); i++) { + CHECK_EQ(strings[i].size(), resp.sizes(i)); + CHECK_EQ(crc::Crc32c(strings[i].data(), strings[i].size()), + resp.crcs(i)); + } + return Status::OK(); } diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc index 58f0f16..005b660 100644 --- a/src/kudu/rpc/rpc-test.cc +++ b/src/kudu/rpc/rpc-test.cc @@ -19,6 +19,7 @@ #include <cstdint> #include <cstdlib> #include <cstring> +#include <functional> #include <limits> #include <memory> #include <ostream> @@ -47,6 +48,7 @@ #include "kudu/rpc/reactor.h" #include "kudu/rpc/rpc-test-base.h" #include "kudu/rpc/rpc_controller.h" +#include "kudu/rpc/rpc_header.pb.h" #include "kudu/rpc/rpc_introspection.pb.h" #include "kudu/rpc/rpc_sidecar.h" #include "kudu/rpc/rtest.pb.h" @@ -60,6 +62,7 @@ #include "kudu/util/net/sockaddr.h" #include "kudu/util/net/socket.h" #include "kudu/util/random.h" +#include "kudu/util/random_util.h" #include "kudu/util/scoped_cleanup.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" @@ -840,6 +843,29 @@ TEST_P(TestRpc, TestRpcSidecar) { DoTestOutgoingSidecarExpectOK(p, 3000 * 1024, 2000 * 1024); } +// Test sending the maximum number of sidecars, each of them being a single +// character. This makes sure we handle the limit of IOV_MAX iovecs per sendmsg +// call. +TEST_P(TestRpc, TestMaxSmallSidecars) { + // Set up server. + Sockaddr server_addr; + bool enable_ssl = GetParam(); + ASSERT_OK(StartTestServer(&server_addr, enable_ssl)); + + // Set up client. + shared_ptr<Messenger> client_messenger; + ASSERT_OK(CreateMessenger("Client", &client_messenger, 1, GetParam())); + Proxy p(client_messenger, server_addr, server_addr.host(), + GenericCalculatorService::static_service_name()); + + Random rng(GetRandomSeed32()); + vector<string> strings(TransferLimits::kMaxSidecars); + for (auto& s : strings) { + s = RandomString(2, &rng); + } + ASSERT_OK(DoTestOutgoingSidecar(p, strings)); +} + TEST_P(TestRpc, TestRpcSidecarLimits) { { // Test that the limits on the number of sidecars is respected. @@ -907,11 +933,10 @@ TEST_P(TestRpc, TestRpcSidecarLimits) { int idx; ASSERT_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(Slice(max_string)), &idx)); - PushTwoStringsRequestPB request; - request.set_sidecar1_idx(idx); - request.set_sidecar2_idx(idx); - PushTwoStringsResponsePB resp; - Status status = p.SyncRequest(GenericCalculatorService::kPushTwoStringsMethodName, + PushStringsRequestPB request; + request.add_sidecar_indexes(idx); + PushStringsResponsePB resp; + Status status = p.SyncRequest(GenericCalculatorService::kPushStringsMethodName, request, &resp, &controller); ASSERT_TRUE(status.IsNetworkError()) << "Unexpected error: " << status.ToString(); // Remote responds to extra-large payloads by closing the connection. @@ -1423,16 +1448,16 @@ static void SendAndCancelRpcs(Proxy* p, const Slice& slice) { int i = 0; while (MonoTime::Now() < end_time) { controller.Reset(); - PushTwoStringsRequestPB request; - PushTwoStringsResponsePB resp; + PushStringsRequestPB request; + PushStringsResponsePB resp; int idx; CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx)); - request.set_sidecar1_idx(idx); + request.add_sidecar_indexes(idx); CHECK_OK(controller.AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx)); - request.set_sidecar2_idx(idx); + request.add_sidecar_indexes(idx); CountDownLatch latch(1); - p->AsyncRequest(GenericCalculatorService::kPushTwoStringsMethodName, + p->AsyncRequest(GenericCalculatorService::kPushStringsMethodName, request, &resp, &controller, boost::bind(&CountDownLatch::CountDown, boost::ref(latch))); diff --git a/src/kudu/rpc/rpc_sidecar.cc b/src/kudu/rpc/rpc_sidecar.cc index dbeb845..da7244d 100644 --- a/src/kudu/rpc/rpc_sidecar.cc +++ b/src/kudu/rpc/rpc_sidecar.cc @@ -17,11 +17,12 @@ #include "kudu/rpc/rpc_sidecar.h" +#include <algorithm> #include <cstdint> #include <memory> -#include <utility> #include <vector> +#include <boost/container/vector.hpp> #include <google/protobuf/repeated_field.h> #include "kudu/gutil/strings/substitute.h" @@ -89,8 +90,9 @@ unique_ptr<RpcSidecar> RpcSidecar::FromSlice(Slice slice) { Status RpcSidecar::ParseSidecars( const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets, - Slice buffer, Slice* sidecars) { - if (offsets.size() == 0) return Status::OK(); + Slice buffer, + SidecarSliceVector* sidecars) { + if (offsets.empty()) return Status::OK(); int last = offsets.size() - 1; if (last >= TransferLimits::kMaxSidecars) { @@ -105,6 +107,7 @@ Status RpcSidecar::ParseSidecars( buffer.size(), TransferLimits::kMaxTotalSidecarBytes)); } + sidecars->resize(offsets.size()); for (int i = 0; i < last; ++i) { int64_t cur_offset = offsets.Get(i); int64_t next_offset = offsets.Get(i + 1); @@ -120,7 +123,7 @@ Status RpcSidecar::ParseSidecars( " but ends before that at offset $1.", i, cur_offset, next_offset)); } - sidecars[i] = Slice(buffer.data() + cur_offset, next_offset - cur_offset); + (*sidecars)[i] = Slice(buffer.data() + cur_offset, next_offset - cur_offset); } int64_t cur_offset = offsets.Get(last); @@ -129,7 +132,7 @@ Status RpcSidecar::ParseSidecars( "starts at offset $1after message ends (message length $2).", last, cur_offset, buffer.size())); } - sidecars[last] = Slice(buffer.data() + cur_offset, buffer.size() - cur_offset); + (*sidecars)[last] = Slice(buffer.data() + cur_offset, buffer.size() - cur_offset); return Status::OK(); } diff --git a/src/kudu/rpc/rpc_sidecar.h b/src/kudu/rpc/rpc_sidecar.h index 3fccc5b..6cb3449 100644 --- a/src/kudu/rpc/rpc_sidecar.h +++ b/src/kudu/rpc/rpc_sidecar.h @@ -20,6 +20,7 @@ #include <memory> #include <vector> +#include <boost/container/small_vector.hpp> #include <google/protobuf/repeated_field.h> // IWYU pragma: keep #include <google/protobuf/stubs/port.h> @@ -33,6 +34,8 @@ class faststring; namespace rpc { +typedef boost::container::small_vector<Slice, 2> SidecarSliceVector; + // An RpcSidecar is a mechanism which allows replies to RPCs to reference blocks of data // without extra copies. In other words, whenever a protobuf would have a large field // where additional copies become expensive, one may opt instead to use an RpcSidecar. @@ -58,12 +61,11 @@ class RpcSidecar { static std::unique_ptr<RpcSidecar> FromSlice(Slice slice); // Utility method to parse a series of sidecar slices into 'sidecars' from 'buffer' and - // a set of offsets. 'sidecars' must have length >= TransferLimits::kMaxSidecars, and - // will be filled from index 0. - // TODO(henryr): Consider a vector instead here if there's no perf. impact. + // a set of offsets. static Status ParseSidecars( const ::google::protobuf::RepeatedField<::google::protobuf::uint32>& offsets, - Slice buffer, Slice* sidecars); + Slice buffer, + SidecarSliceVector* sidecars); // Append Slice representation of the sidecar's data to the given payload. // diff --git a/src/kudu/rpc/rtest.proto b/src/kudu/rpc/rtest.proto index d212cef..852bc6b 100644 --- a/src/kudu/rpc/rtest.proto +++ b/src/kudu/rpc/rtest.proto @@ -76,17 +76,16 @@ message SendTwoStringsResponsePB { required uint32 sidecar2 = 2; } -// Push two strings to the server as part of the request, in sidecars. -message PushTwoStringsRequestPB { - required uint32 sidecar1_idx = 1; - required uint32 sidecar2_idx = 2; +// Push strings to the server as part of the request, in sidecars. +message PushStringsRequestPB { + repeated uint32 sidecar_indexes = 1; } -message PushTwoStringsResponsePB { - required uint32 size1 = 1; - required string data1 = 2; - required uint32 size2 = 3; - required string data2 = 4; +// The server responds with the sizes and checksums of the sidecars that +// it received. +message PushStringsResponsePB { + repeated uint32 sizes = 1; + repeated uint32 crcs = 2; } message EchoRequestPB { diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc index 7693a91..422e448 100644 --- a/src/kudu/rpc/transfer.cc +++ b/src/kudu/rpc/transfer.cc @@ -225,7 +225,7 @@ Status OutboundTransfer::SendBuffer(Socket &socket) { CHECK_LT(cur_slice_idx_, payload_slices_.size()); started_ = true; - int n_iovecs = payload_slices_.size() - cur_slice_idx_; + int n_iovecs = std::min<int>(payload_slices_.size() - cur_slice_idx_, IOV_MAX); struct iovec iovec[n_iovecs]; { int offset_in_slice = cur_offset_in_slice_; diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h index 3cab6b1..91135d6 100644 --- a/src/kudu/rpc/transfer.h +++ b/src/kudu/rpc/transfer.h @@ -44,7 +44,7 @@ struct TransferCallbacks; class TransferLimits { public: enum { - kMaxSidecars = 10, + kMaxSidecars = 10000, kMaxTotalSidecarBytes = INT_MAX };
