This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 66794ac09 KUDU-3595: Add interface to set rpc_max_message_size for C++
client
66794ac09 is described below
commit 66794ac0970a6313e7a081720424228fdf8a5617
Author: Ashwani Raina <[email protected]>
AuthorDate: Mon Jul 29 23:28:28 2024 +0530
KUDU-3595: Add interface to set rpc_max_message_size for C++ client
Kudu client's default capacity to hold RPC payload prove to be less
in cases where payload size goes beyond 50MB (default max size).
This patch adds a method to KuduClientBuilder that enables clients
to change the default max size for RPC message per application needs.
For server, value remains set to default i.e. rpc_max_message_size.
If the default value for server is changed by setting the flag, the
same value gets applied to the subsequent inbound transfer object.
Unit test (TestRpcSidecarWithSizeLimits) tests two scenarios:
1. RPC max message size at client is greater or equal to the size
of message being received from server - Completes successfully.
2. RPC max message size at client is less than the size of message
being received from server - Op fails with shutting down of client
connection and appropriate error message.
Change-Id: Ib8feb5ba92ea604442a643e3286944564e655fa6
Reviewed-on: http://gerrit.cloudera.org:8080/21622
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
---
src/kudu/client/client.cc | 8 +++++
src/kudu/client/client.h | 10 ++++++
src/kudu/client/client_builder-internal.cc | 6 ++++
src/kudu/client/client_builder-internal.h | 2 ++
src/kudu/rpc/connection.cc | 4 ++-
src/kudu/rpc/messenger.cc | 3 ++
src/kudu/rpc/messenger.h | 12 +++++++
src/kudu/rpc/rpc-test-base.h | 35 +++++++++++++++++-
src/kudu/rpc/rpc-test.cc | 57 ++++++++++++++++++++++++++++++
src/kudu/rpc/transfer.cc | 10 +++---
src/kudu/rpc/transfer.h | 2 +-
src/kudu/tools/kudu-tool-test.cc | 2 +-
12 files changed, 143 insertions(+), 8 deletions(-)
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index d270b2c03..5c28a401c 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -322,6 +322,11 @@ KuduClientBuilder&
KuduClientBuilder::trusted_certificate(const string& cert_pem
return *this;
}
+KuduClientBuilder& KuduClientBuilder::rpc_max_message_size(int64_t size) {
+ data_->rpc_max_message_size_ = size;
+ return *this;
+}
+
KuduClientBuilder& KuduClientBuilder::num_reactors(int num_reactors) {
data_->num_reactors_ = num_reactors;
return *this;
@@ -385,6 +390,9 @@ Status KuduClientBuilder::Build(shared_ptr<KuduClient>*
client) {
builder.set_rpc_negotiation_timeout_ms(
data_->connection_negotiation_timeout_.ToMilliseconds());
}
+ if (data_->rpc_max_message_size_) {
+ builder.set_rpc_max_message_size(*data_->rpc_max_message_size_);
+ }
if (data_->num_reactors_) {
builder.set_num_reactors(*data_->num_reactors_);
}
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 953c6b90e..a7ce897aa 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -306,6 +306,16 @@ class KUDU_EXPORT KuduClientBuilder {
/// @return Reference to the updated object.
KuduClientBuilder& connection_negotiation_timeout(const MonoDelta& timeout);
+ /// Set the maximum size of RPC message.
+ ///
+ /// If not provided, the underlying messenger is created with reasonable
+ /// default (FLAGS_rpc_max_message_size).
+ ///
+ /// @param [in] size
+ /// Max size value to set.
+ /// @return Reference to the updated object.
+ KuduClientBuilder& rpc_max_message_size(int64_t size);
+
/// Set JWT (JSON Web Token) to authenticate the client to a server.
///
/// @note If both @c import_authentication_credentials and
diff --git a/src/kudu/client/client_builder-internal.cc
b/src/kudu/client/client_builder-internal.cc
index b3c027c41..0cd50942d 100644
--- a/src/kudu/client/client_builder-internal.cc
+++ b/src/kudu/client/client_builder-internal.cc
@@ -16,8 +16,13 @@
// under the License.
#include "kudu/client/client_builder-internal.h"
+
+#include <gflags/gflags_declare.h>
+
#include "kudu/client/replica_controller-internal.h"
+DECLARE_int64(rpc_max_message_size);
+
namespace kudu {
namespace client {
@@ -26,6 +31,7 @@ KuduClientBuilder::Data::Data()
: default_admin_operation_timeout_(MonoDelta::FromSeconds(30)),
default_rpc_timeout_(MonoDelta::FromSeconds(10)),
replica_visibility_(internal::ReplicaController::Visibility::VOTERS),
+ rpc_max_message_size_(FLAGS_rpc_max_message_size),
require_authentication_(false),
encryption_policy_(EncryptionPolicy::OPTIONAL) {
}
diff --git a/src/kudu/client/client_builder-internal.h
b/src/kudu/client/client_builder-internal.h
index 71a0f90f0..2183b55f1 100644
--- a/src/kudu/client/client_builder-internal.h
+++ b/src/kudu/client/client_builder-internal.h
@@ -16,6 +16,7 @@
// under the License.
#pragma once
+#include <cstdint>
#include <optional>
#include <string>
#include <vector>
@@ -41,6 +42,7 @@ class KuduClientBuilder::Data {
std::string authn_creds_;
std::string jwt_;
internal::ReplicaController::Visibility replica_visibility_;
+ std::optional<int64_t> rpc_max_message_size_;
std::optional<int> num_reactors_;
std::string sasl_protocol_name_;
bool require_authentication_;
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index dfb8ec79e..8bf6e0b1e 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -522,12 +522,14 @@ void Connection::ReadHandler(ev::io& /*watcher*/, int
revents) {
}
last_activity_time_ = reactor_thread_->cur_time();
+ const int64_t rpc_max_size =
reactor_thread_->reactor()->messenger()->rpc_max_message_size();
faststring extra_buf;
while (true) {
if (!inbound_) {
+ // Initialize the maximum RPC message size set by caller.
inbound_.reset(new InboundTransfer());
}
- Status status = inbound_->ReceiveBuffer(socket_.get(), &extra_buf);
+ Status status = inbound_->ReceiveBuffer(socket_.get(), &extra_buf,
rpc_max_size);
if (PREDICT_FALSE(!status.ok())) {
if (status.posix_code() == ESHUTDOWN) {
VLOG(1) << Substitute("$0 shut down by remote end", ToString());
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 2d337e1c2..15db7d09c 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -43,6 +43,7 @@
#include "kudu/rpc/sasl_common.h"
#include "kudu/rpc/server_negotiation.h"
#include "kudu/rpc/service_if.h"
+#include "kudu/rpc/transfer.h"
#include "kudu/security/tls_context.h"
#include "kudu/security/token_verifier.h"
#include "kudu/util/flags.h"
@@ -81,6 +82,7 @@ MessengerBuilder::MessengerBuilder(string name)
connection_keepalive_time_(MonoDelta::FromMilliseconds(65000)),
acceptor_listen_backlog_(AcceptorPool::kDefaultListenBacklog),
num_reactors_(4),
+ rpc_max_message_size_(FLAGS_rpc_max_message_size),
min_negotiation_threads_(0),
max_negotiation_threads_(4),
coarse_timer_granularity_(MonoDelta::FromMilliseconds(100)),
@@ -379,6 +381,7 @@ Messenger::Messenger(const MessengerBuilder& bld)
rpcz_store_(new RpczStore),
metric_entity_(bld.metric_entity_),
rpc_negotiation_timeout_ms_(bld.rpc_negotiation_timeout_ms_),
+ rpc_max_message_size_(bld.rpc_max_message_size_),
hostname_(bld.hostname_),
sasl_proto_name_(bld.sasl_proto_name_),
keytab_file_(bld.keytab_file_),
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index d109fe228..74de24bbb 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -110,6 +110,12 @@ class MessengerBuilder {
return *this;
}
+ // Set the maximum size of RPC message for sending and receiving.
+ MessengerBuilder& set_rpc_max_message_size(int64_t rpc_max_message_size) {
+ rpc_max_message_size_ = rpc_max_message_size;
+ return *this;
+ }
+
// Set the minimum number of connection-negotiation threads that will be used
// to handle the blocking connection-negotiation step.
MessengerBuilder& set_min_negotiation_threads(int min_negotiation_threads) {
@@ -278,6 +284,7 @@ class MessengerBuilder {
MonoDelta connection_keepalive_time_;
int acceptor_listen_backlog_;
int num_reactors_;
+ int64_t rpc_max_message_size_;
int min_negotiation_threads_;
int max_negotiation_threads_;
MonoDelta coarse_timer_granularity_;
@@ -448,6 +455,8 @@ class Messenger {
return rpc_negotiation_timeout_ms_;
}
+ int64_t rpc_max_message_size() const { return rpc_max_message_size_; }
+
// The name of the node where this Messenger is running. The best case is
// FQDN retrieved using getaddrinfo(), but it might be just local hostname
// retrived by gethostname(). It can also be empty if Messenger has been
@@ -563,6 +572,9 @@ class Messenger {
// Timeout in milliseconds after which an incomplete connection negotiation
will timeout.
const int64_t rpc_negotiation_timeout_ms_;
+ // Maximum RPC message size (in bytes) set by MessengerBuilder.
+ int64_t rpc_max_message_size_;
+
// The name of the node where this messenger is running.
const std::string hostname_;
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index ae3a83bac..d7831ba5e 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -448,7 +448,8 @@ class RpcTestBase : public KuduTest {
const std::string& rpc_certificate_file = "",
const std::string& rpc_private_key_file = "",
const std::string& rpc_ca_certificate_file = "",
- const std::string& rpc_private_key_password_cmd = "")
{
+ const std::string& rpc_private_key_password_cmd = "",
+ int64_t rpc_max_message_size =
FLAGS_rpc_max_message_size) {
MessengerBuilder bld(name);
if (enable_ssl) {
@@ -461,6 +462,7 @@ class RpcTestBase : public KuduTest {
}
bld.set_num_reactors(n_reactors);
+ bld.set_rpc_max_message_size(rpc_max_message_size);
bld.set_connection_keepalive_time(MonoDelta::FromMilliseconds(keepalive_time_ms_));
if (keepalive_time_ms_ >= 0) {
// In order for the keepalive timing to be accurate, we need to scan
connections
@@ -524,6 +526,37 @@ static void DoTestSidecar(Proxy* p, int size1, int size2) {
CHECK_EQ(Slice(expected), second);
}
+static Status DoTestSidecarWithSizeLimits(Proxy* p, int size1, int size2) {
+ const uint32_t kSeed = 12345;
+
+ SendTwoStringsRequestPB req;
+ req.set_size1(size1);
+ req.set_size2(size2);
+ req.set_random_seed(kSeed);
+
+ SendTwoStringsResponsePB resp;
+ RpcController controller;
+ controller.set_timeout(MonoDelta::FromMilliseconds(10000));
+ Status status =
p->SyncRequest(GenericCalculatorService::kSendTwoStringsMethodName,
+ req, &resp, &controller);
+ if (status.ok()) {
+ Slice first = GetSidecarPointer(controller, resp.sidecar1(), size1);
+ Slice second = GetSidecarPointer(controller, resp.sidecar2(), size2);
+ Random rng(kSeed);
+ faststring expected;
+
+ expected.resize(size1);
+ RandomString(expected.data(), size1, &rng);
+ CHECK_EQ(Slice(expected), first);
+
+ expected.resize(size2);
+ RandomString(expected.data(), size2, &rng);
+ CHECK_EQ(Slice(expected), second);
+ }
+
+ return status;
+ }
+
static Status DoTestOutgoingSidecar(Proxy* p, int size1, int size2) {
return DoTestOutgoingSidecar(p, {std::string(size1, 'a'),
std::string(size2, 'b')});
}
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 704a27735..c14de85a6 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -23,6 +23,7 @@
#include <cstring>
#include <functional>
#include <limits>
+#include <map>
#include <memory>
#include <ostream>
#include <set>
@@ -98,6 +99,7 @@ DECLARE_int32(tcp_keepalive_probe_period_s);
DECLARE_int32(tcp_keepalive_retry_period_s);
DECLARE_int32(tcp_keepalive_retry_count);
+using std::map;
using std::tuple;
using std::shared_ptr;
using std::string;
@@ -878,6 +880,61 @@ TEST_P(TestRpc, TestTCPKeepalive) {
req, &resp, &controller));
}
+// Test that the RpcSidecar transfers the messages within RPC max message
+// size limit and errors out when limit is crossed.
+TEST_P(TestRpc, TestRpcSidecarWithSizeLimits) {
+ int64_t rpc_message_size = 30 * 1024 * 1024; // 30 MB
+ map<std::pair<int64_t, int64_t>, string> rpc_max_message_server_and_client;
+
+ // 1. Set the rpc max size to:
+ // Server: 50 MB,
+ // Client: 70 MB,
+ // so that client is able to accommodate the response size of 60 MB.
+ EmplaceIfNotPresent(&rpc_max_message_server_and_client,
+ std::make_pair((50 * 1024 * 1024), (70 * 1024 * 1024)),
+ "OK");
+
+ // 2. Set the rpc max size to:
+ // Server: 50 MB,
+ // Client: 20 MB,
+ // so that client rejects the inbound message of size 60 MB.
+ EmplaceIfNotPresent(&rpc_max_message_server_and_client,
+ std::make_pair((50 * 1024 * 1024), (20 * 1024 * 1024)),
+ "Network error: RPC frame had a length of");
+
+ for (auto const& rpc_max_message_size : rpc_max_message_server_and_client) {
+ // Set rpc_max_message_size.
+ int64_t server_rpc_max_size = rpc_max_message_size.first.first;
+ int64_t client_rpc_max_size = rpc_max_message_size.first.second;
+
+ // Set up server.
+ Sockaddr server_addr = bind_addr();
+
+ MessengerBuilder mb("TestRpc.TestRpcSidecarWithSizeLimits");
+ mb.set_rpc_max_message_size(server_rpc_max_size)
+ .set_metric_entity(metric_entity_);
+ if (enable_ssl()) mb.enable_inbound_tls();
+
+ shared_ptr<Messenger> messenger;
+ ASSERT_OK(mb.Build(&messenger));
+
+ ASSERT_OK(StartTestServerWithCustomMessenger(&server_addr, messenger,
enable_ssl()));
+
+ // Set up client.
+ shared_ptr<Messenger> client_messenger;
+ ASSERT_OK(CreateMessenger("Client", &client_messenger,
+ 1, enable_ssl(), "", "", "", "",
client_rpc_max_size));
+ Proxy p(client_messenger, server_addr, kRemoteHostName,
+ GenericCalculatorService::static_service_name());
+
+ Status status = DoTestSidecarWithSizeLimits(&p, rpc_message_size,
rpc_message_size);
+
+ // OK: If size of payload is within max rpc message size limit.
+ // Close connection: If size of payload is beyond max message size limit.
+ ASSERT_STR_CONTAINS(status.ToString(), rpc_max_message_size.second);
+ }
+}
+
// Test that the RpcSidecar transfers the expected messages.
TEST_P(TestRpc, TestRpcSidecar) {
// Set up server.
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index 54405c25f..2696a272a 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -99,7 +99,9 @@ InboundTransfer::InboundTransfer(faststring initial_buf)
buf_.resize(std::max<size_t>(kMsgLengthPrefixLength, buf_.size()));
}
-Status InboundTransfer::ReceiveBuffer(Socket* socket, faststring* extra_4) {
+Status InboundTransfer::ReceiveBuffer(Socket* socket,
+ faststring* extra_4,
+ const int64_t rpc_max_message_size) {
static constexpr int kExtraReadLength = kMsgLengthPrefixLength;
if (total_length_ == 0) {
// We haven't yet parsed the message length. It's possible that the
@@ -127,10 +129,10 @@ Status InboundTransfer::ReceiveBuffer(Socket* socket,
faststring* extra_4) {
// The length prefix doesn't include its own 4 bytes, so we have to
// add that back in.
total_length_ = NetworkByteOrder::Load32(&buf_[0]) +
kMsgLengthPrefixLength;
- if (PREDICT_FALSE(total_length_ > FLAGS_rpc_max_message_size)) {
+ if (PREDICT_FALSE(total_length_ > rpc_max_message_size)) {
return Status::NetworkError(Substitute(
- "RPC frame had a length of $0, but we only support messages up to $1
bytes "
- "long.", total_length_, FLAGS_rpc_max_message_size));
+ "RPC frame had a length of $0, but we only support messages up to $1
bytes long.",
+ total_length_, rpc_max_message_size));
}
if (PREDICT_FALSE(total_length_ <= kMsgLengthPrefixLength)) {
return Status::NetworkError(
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index adf59b2a8..bcacff340 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -76,7 +76,7 @@ class InboundTransfer {
// after this call returns OK), up to 4 extra bytes may have been read
// from the socket and stored in 'extra_4'. In that case, any previous
content of
// 'extra_4' is replaced by this extra bytes.
- Status ReceiveBuffer(Socket* socket, faststring* extra_4);
+ Status ReceiveBuffer(Socket* socket, faststring* extra_4, int64_t
rpc_max_message_size);
// Return true if any bytes have yet been sent.
bool TransferStarted() const;
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 186c71ebf..32522559f 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -10069,7 +10069,7 @@ TEST_P(DownloadSuperblockInBatchTest,
TestDownloadSuperblockInBatch) {
"--rpc_max_message_size_enable_validation=false "
// Set --rpc_max_message_size very small, so it is easy for
the size of
// superblock over --rpc_max_message_size. It is used to
repeat the network
- // error, see line 9477.
+ // error, see 'else' condition below.
"--rpc_max_message_size=$5 "
// This flag and --rpc_max_message_size are in a group flag
validator, so
// it is also should be set a small value.