This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch branch-1.18.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.18.x by this push:
     new 351ae9bfd KUDU-3595: Add interface to set rpc_max_message_size for C++ 
client
351ae9bfd is described below

commit 351ae9bfdbb7eb10165454e64f21c189ea776cbc
Author: Ashwani Raina <[email protected]>
AuthorDate: Mon Jul 29 23:28:28 2024 +0530

    KUDU-3595: Add interface to set rpc_max_message_size for C++ client
    
    Kudu client's default capacity to hold RPC payload prove to be less
    in cases where payload size goes beyond 50MB (default max size).
    This patch adds a method to KuduClientBuilder that enables clients
    to change the default max size for RPC message per application needs.
    For server, value remains set to default i.e. rpc_max_message_size.
    If the default value for server is changed by setting the flag, the
    same value gets applied to the subsequent inbound transfer object.
    
    Unit test (TestRpcSidecarWithSizeLimits) tests two scenarios:
    1. RPC max message size at client is greater or equal to the size
       of message being received from server - Completes successfully.
    2. RPC max message size at client is less than the size of message
       being received from server - Op fails with shutting down of client
       connection and appropriate error message.
    
    Change-Id: Ib8feb5ba92ea604442a643e3286944564e655fa6
    Reviewed-on: http://gerrit.cloudera.org:8080/21622
    Reviewed-by: Alexey Serbin <[email protected]>
    Tested-by: Alexey Serbin <[email protected]>
    (cherry picked from commit 66794ac0970a6313e7a081720424228fdf8a5617)
    Reviewed-on: http://gerrit.cloudera.org:8080/22002
    Reviewed-by: Abhishek Chennaka <[email protected]>
---
 src/kudu/client/client.cc                  |  8 +++++
 src/kudu/client/client.h                   | 10 ++++++
 src/kudu/client/client_builder-internal.cc |  6 ++++
 src/kudu/client/client_builder-internal.h  |  2 ++
 src/kudu/rpc/connection.cc                 |  4 ++-
 src/kudu/rpc/messenger.cc                  |  3 ++
 src/kudu/rpc/messenger.h                   | 12 +++++++
 src/kudu/rpc/rpc-test-base.h               | 35 +++++++++++++++++-
 src/kudu/rpc/rpc-test.cc                   | 57 ++++++++++++++++++++++++++++++
 src/kudu/rpc/transfer.cc                   | 10 +++---
 src/kudu/rpc/transfer.h                    |  2 +-
 src/kudu/tools/kudu-tool-test.cc           |  2 +-
 12 files changed, 143 insertions(+), 8 deletions(-)

diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index d270b2c03..5c28a401c 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -322,6 +322,11 @@ KuduClientBuilder& 
KuduClientBuilder::trusted_certificate(const string& cert_pem
   return *this;
 }
 
+KuduClientBuilder& KuduClientBuilder::rpc_max_message_size(int64_t size) {
+  data_->rpc_max_message_size_ = size;
+  return *this;
+}
+
 KuduClientBuilder& KuduClientBuilder::num_reactors(int num_reactors) {
   data_->num_reactors_ = num_reactors;
   return *this;
@@ -385,6 +390,9 @@ Status KuduClientBuilder::Build(shared_ptr<KuduClient>* 
client) {
     builder.set_rpc_negotiation_timeout_ms(
         data_->connection_negotiation_timeout_.ToMilliseconds());
   }
+  if (data_->rpc_max_message_size_) {
+    builder.set_rpc_max_message_size(*data_->rpc_max_message_size_);
+  }
   if (data_->num_reactors_) {
     builder.set_num_reactors(*data_->num_reactors_);
   }
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 953c6b90e..a7ce897aa 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -306,6 +306,16 @@ class KUDU_EXPORT KuduClientBuilder {
   /// @return Reference to the updated object.
   KuduClientBuilder& connection_negotiation_timeout(const MonoDelta& timeout);
 
+  /// Set the maximum size of RPC message.
+  ///
+  /// If not provided, the underlying messenger is created with reasonable
+  /// default (FLAGS_rpc_max_message_size).
+  ///
+  /// @param [in] size
+  ///   Max size value to set.
+  /// @return Reference to the updated object.
+  KuduClientBuilder& rpc_max_message_size(int64_t size);
+
   /// Set JWT (JSON Web Token) to authenticate the client to a server.
   ///
   /// @note If both @c import_authentication_credentials and
diff --git a/src/kudu/client/client_builder-internal.cc 
b/src/kudu/client/client_builder-internal.cc
index b3c027c41..0cd50942d 100644
--- a/src/kudu/client/client_builder-internal.cc
+++ b/src/kudu/client/client_builder-internal.cc
@@ -16,8 +16,13 @@
 // under the License.
 
 #include "kudu/client/client_builder-internal.h"
+
+#include <gflags/gflags_declare.h>
+
 #include "kudu/client/replica_controller-internal.h"
 
+DECLARE_int64(rpc_max_message_size);
+
 namespace kudu {
 
 namespace client {
@@ -26,6 +31,7 @@ KuduClientBuilder::Data::Data()
     : default_admin_operation_timeout_(MonoDelta::FromSeconds(30)),
       default_rpc_timeout_(MonoDelta::FromSeconds(10)),
       replica_visibility_(internal::ReplicaController::Visibility::VOTERS),
+      rpc_max_message_size_(FLAGS_rpc_max_message_size),
       require_authentication_(false),
       encryption_policy_(EncryptionPolicy::OPTIONAL) {
   }
diff --git a/src/kudu/client/client_builder-internal.h 
b/src/kudu/client/client_builder-internal.h
index 71a0f90f0..2183b55f1 100644
--- a/src/kudu/client/client_builder-internal.h
+++ b/src/kudu/client/client_builder-internal.h
@@ -16,6 +16,7 @@
 // under the License.
 #pragma once
 
+#include <cstdint>
 #include <optional>
 #include <string>
 #include <vector>
@@ -41,6 +42,7 @@ class KuduClientBuilder::Data {
   std::string authn_creds_;
   std::string jwt_;
   internal::ReplicaController::Visibility replica_visibility_;
+  std::optional<int64_t> rpc_max_message_size_;
   std::optional<int> num_reactors_;
   std::string sasl_protocol_name_;
   bool require_authentication_;
diff --git a/src/kudu/rpc/connection.cc b/src/kudu/rpc/connection.cc
index dfb8ec79e..8bf6e0b1e 100644
--- a/src/kudu/rpc/connection.cc
+++ b/src/kudu/rpc/connection.cc
@@ -522,12 +522,14 @@ void Connection::ReadHandler(ev::io& /*watcher*/, int 
revents) {
   }
   last_activity_time_ = reactor_thread_->cur_time();
 
+  const int64_t rpc_max_size = 
reactor_thread_->reactor()->messenger()->rpc_max_message_size();
   faststring extra_buf;
   while (true) {
     if (!inbound_) {
+      // Initialize the maximum RPC message size set by caller.
       inbound_.reset(new InboundTransfer());
     }
-    Status status = inbound_->ReceiveBuffer(socket_.get(), &extra_buf);
+    Status status = inbound_->ReceiveBuffer(socket_.get(), &extra_buf, 
rpc_max_size);
     if (PREDICT_FALSE(!status.ok())) {
       if (status.posix_code() == ESHUTDOWN) {
         VLOG(1) << Substitute("$0 shut down by remote end", ToString());
diff --git a/src/kudu/rpc/messenger.cc b/src/kudu/rpc/messenger.cc
index 2d337e1c2..15db7d09c 100644
--- a/src/kudu/rpc/messenger.cc
+++ b/src/kudu/rpc/messenger.cc
@@ -43,6 +43,7 @@
 #include "kudu/rpc/sasl_common.h"
 #include "kudu/rpc/server_negotiation.h"
 #include "kudu/rpc/service_if.h"
+#include "kudu/rpc/transfer.h"
 #include "kudu/security/tls_context.h"
 #include "kudu/security/token_verifier.h"
 #include "kudu/util/flags.h"
@@ -81,6 +82,7 @@ MessengerBuilder::MessengerBuilder(string name)
       connection_keepalive_time_(MonoDelta::FromMilliseconds(65000)),
       acceptor_listen_backlog_(AcceptorPool::kDefaultListenBacklog),
       num_reactors_(4),
+      rpc_max_message_size_(FLAGS_rpc_max_message_size),
       min_negotiation_threads_(0),
       max_negotiation_threads_(4),
       coarse_timer_granularity_(MonoDelta::FromMilliseconds(100)),
@@ -379,6 +381,7 @@ Messenger::Messenger(const MessengerBuilder& bld)
       rpcz_store_(new RpczStore),
       metric_entity_(bld.metric_entity_),
       rpc_negotiation_timeout_ms_(bld.rpc_negotiation_timeout_ms_),
+      rpc_max_message_size_(bld.rpc_max_message_size_),
       hostname_(bld.hostname_),
       sasl_proto_name_(bld.sasl_proto_name_),
       keytab_file_(bld.keytab_file_),
diff --git a/src/kudu/rpc/messenger.h b/src/kudu/rpc/messenger.h
index d109fe228..74de24bbb 100644
--- a/src/kudu/rpc/messenger.h
+++ b/src/kudu/rpc/messenger.h
@@ -110,6 +110,12 @@ class MessengerBuilder {
     return *this;
   }
 
+  // Set the maximum size of RPC message for sending and receiving.
+  MessengerBuilder& set_rpc_max_message_size(int64_t rpc_max_message_size) {
+    rpc_max_message_size_ = rpc_max_message_size;
+    return *this;
+  }
+
   // Set the minimum number of connection-negotiation threads that will be used
   // to handle the blocking connection-negotiation step.
   MessengerBuilder& set_min_negotiation_threads(int min_negotiation_threads) {
@@ -278,6 +284,7 @@ class MessengerBuilder {
   MonoDelta connection_keepalive_time_;
   int acceptor_listen_backlog_;
   int num_reactors_;
+  int64_t rpc_max_message_size_;
   int min_negotiation_threads_;
   int max_negotiation_threads_;
   MonoDelta coarse_timer_granularity_;
@@ -448,6 +455,8 @@ class Messenger {
     return rpc_negotiation_timeout_ms_;
   }
 
+  int64_t rpc_max_message_size() const { return rpc_max_message_size_; }
+
   // The name of the node where this Messenger is running. The best case is
   // FQDN retrieved using getaddrinfo(), but it might be just local hostname
   // retrived by gethostname(). It can also be empty if Messenger has been
@@ -563,6 +572,9 @@ class Messenger {
   // Timeout in milliseconds after which an incomplete connection negotiation 
will timeout.
   const int64_t rpc_negotiation_timeout_ms_;
 
+  // Maximum RPC message size (in bytes) set by MessengerBuilder.
+  int64_t rpc_max_message_size_;
+
   // The name of the node where this messenger is running.
   const std::string hostname_;
 
diff --git a/src/kudu/rpc/rpc-test-base.h b/src/kudu/rpc/rpc-test-base.h
index ae3a83bac..d7831ba5e 100644
--- a/src/kudu/rpc/rpc-test-base.h
+++ b/src/kudu/rpc/rpc-test-base.h
@@ -448,7 +448,8 @@ class RpcTestBase : public KuduTest {
                          const std::string& rpc_certificate_file = "",
                          const std::string& rpc_private_key_file = "",
                          const std::string& rpc_ca_certificate_file = "",
-                         const std::string& rpc_private_key_password_cmd = "") 
{
+                         const std::string& rpc_private_key_password_cmd = "",
+                         int64_t rpc_max_message_size = 
FLAGS_rpc_max_message_size) {
     MessengerBuilder bld(name);
 
     if (enable_ssl) {
@@ -461,6 +462,7 @@ class RpcTestBase : public KuduTest {
     }
 
     bld.set_num_reactors(n_reactors);
+    bld.set_rpc_max_message_size(rpc_max_message_size);
     
bld.set_connection_keepalive_time(MonoDelta::FromMilliseconds(keepalive_time_ms_));
     if (keepalive_time_ms_ >= 0) {
       // In order for the keepalive timing to be accurate, we need to scan 
connections
@@ -524,6 +526,37 @@ static void DoTestSidecar(Proxy* p, int size1, int size2) {
     CHECK_EQ(Slice(expected), second);
   }
 
+static Status DoTestSidecarWithSizeLimits(Proxy* p, int size1, int size2) {
+    const uint32_t kSeed = 12345;
+
+    SendTwoStringsRequestPB req;
+    req.set_size1(size1);
+    req.set_size2(size2);
+    req.set_random_seed(kSeed);
+
+    SendTwoStringsResponsePB resp;
+    RpcController controller;
+    controller.set_timeout(MonoDelta::FromMilliseconds(10000));
+    Status status = 
p->SyncRequest(GenericCalculatorService::kSendTwoStringsMethodName,
+                                   req, &resp, &controller);
+    if (status.ok()) {
+      Slice first = GetSidecarPointer(controller, resp.sidecar1(), size1);
+      Slice second = GetSidecarPointer(controller, resp.sidecar2(), size2);
+      Random rng(kSeed);
+      faststring expected;
+
+      expected.resize(size1);
+      RandomString(expected.data(), size1, &rng);
+      CHECK_EQ(Slice(expected), first);
+
+      expected.resize(size2);
+      RandomString(expected.data(), size2, &rng);
+      CHECK_EQ(Slice(expected), second);
+    }
+
+    return status;
+  }
+
   static Status DoTestOutgoingSidecar(Proxy* p, int size1, int size2) {
     return DoTestOutgoingSidecar(p, {std::string(size1, 'a'), 
std::string(size2, 'b')});
   }
diff --git a/src/kudu/rpc/rpc-test.cc b/src/kudu/rpc/rpc-test.cc
index 704a27735..c14de85a6 100644
--- a/src/kudu/rpc/rpc-test.cc
+++ b/src/kudu/rpc/rpc-test.cc
@@ -23,6 +23,7 @@
 #include <cstring>
 #include <functional>
 #include <limits>
+#include <map>
 #include <memory>
 #include <ostream>
 #include <set>
@@ -98,6 +99,7 @@ DECLARE_int32(tcp_keepalive_probe_period_s);
 DECLARE_int32(tcp_keepalive_retry_period_s);
 DECLARE_int32(tcp_keepalive_retry_count);
 
+using std::map;
 using std::tuple;
 using std::shared_ptr;
 using std::string;
@@ -878,6 +880,61 @@ TEST_P(TestRpc, TestTCPKeepalive) {
       req, &resp, &controller));
 }
 
+// Test that the RpcSidecar transfers the messages within RPC max message
+// size limit and errors out when limit is crossed.
+TEST_P(TestRpc, TestRpcSidecarWithSizeLimits) {
+  int64_t rpc_message_size = 30 * 1024 * 1024; // 30 MB
+  map<std::pair<int64_t, int64_t>, string> rpc_max_message_server_and_client;
+
+  // 1. Set the rpc max size to:
+  // Server: 50 MB,
+  // Client: 70 MB,
+  // so that client is able to accommodate the response size of 60 MB.
+  EmplaceIfNotPresent(&rpc_max_message_server_and_client,
+                      std::make_pair((50 * 1024 * 1024), (70 * 1024 * 1024)),
+                      "OK");
+
+  // 2. Set the rpc max size to:
+  // Server: 50 MB,
+  // Client: 20 MB,
+  // so that client rejects the inbound message of size 60 MB.
+  EmplaceIfNotPresent(&rpc_max_message_server_and_client,
+                      std::make_pair((50 * 1024 * 1024), (20 * 1024 * 1024)),
+                      "Network error: RPC frame had a length of");
+
+  for (auto const& rpc_max_message_size : rpc_max_message_server_and_client) {
+    // Set rpc_max_message_size.
+    int64_t server_rpc_max_size = rpc_max_message_size.first.first;
+    int64_t client_rpc_max_size = rpc_max_message_size.first.second;
+
+    // Set up server.
+    Sockaddr server_addr = bind_addr();
+
+    MessengerBuilder mb("TestRpc.TestRpcSidecarWithSizeLimits");
+    mb.set_rpc_max_message_size(server_rpc_max_size)
+      .set_metric_entity(metric_entity_);
+    if (enable_ssl()) mb.enable_inbound_tls();
+
+    shared_ptr<Messenger> messenger;
+    ASSERT_OK(mb.Build(&messenger));
+
+    ASSERT_OK(StartTestServerWithCustomMessenger(&server_addr, messenger, 
enable_ssl()));
+
+    // Set up client.
+    shared_ptr<Messenger> client_messenger;
+    ASSERT_OK(CreateMessenger("Client", &client_messenger,
+                              1, enable_ssl(), "", "", "", "", 
client_rpc_max_size));
+    Proxy p(client_messenger, server_addr, kRemoteHostName,
+            GenericCalculatorService::static_service_name());
+
+    Status status = DoTestSidecarWithSizeLimits(&p, rpc_message_size, 
rpc_message_size);
+
+    // OK: If size of payload is within max rpc message size limit.
+    // Close connection: If size of payload is beyond max message size limit.
+    ASSERT_STR_CONTAINS(status.ToString(), rpc_max_message_size.second);
+  }
+}
+
 // Test that the RpcSidecar transfers the expected messages.
 TEST_P(TestRpc, TestRpcSidecar) {
   // Set up server.
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index 54405c25f..2696a272a 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -99,7 +99,9 @@ InboundTransfer::InboundTransfer(faststring initial_buf)
   buf_.resize(std::max<size_t>(kMsgLengthPrefixLength, buf_.size()));
 }
 
-Status InboundTransfer::ReceiveBuffer(Socket* socket, faststring* extra_4) {
+Status InboundTransfer::ReceiveBuffer(Socket* socket,
+                                      faststring* extra_4,
+                                      const int64_t rpc_max_message_size) {
   static constexpr int kExtraReadLength = kMsgLengthPrefixLength;
   if (total_length_ == 0) {
     // We haven't yet parsed the message length. It's possible that the
@@ -127,10 +129,10 @@ Status InboundTransfer::ReceiveBuffer(Socket* socket, 
faststring* extra_4) {
     // The length prefix doesn't include its own 4 bytes, so we have to
     // add that back in.
     total_length_ = NetworkByteOrder::Load32(&buf_[0]) + 
kMsgLengthPrefixLength;
-    if (PREDICT_FALSE(total_length_ > FLAGS_rpc_max_message_size)) {
+    if (PREDICT_FALSE(total_length_ > rpc_max_message_size)) {
       return Status::NetworkError(Substitute(
-          "RPC frame had a length of $0, but we only support messages up to $1 
bytes "
-          "long.", total_length_, FLAGS_rpc_max_message_size));
+          "RPC frame had a length of $0, but we only support messages up to $1 
bytes long.",
+          total_length_, rpc_max_message_size));
     }
     if (PREDICT_FALSE(total_length_ <= kMsgLengthPrefixLength)) {
       return Status::NetworkError(
diff --git a/src/kudu/rpc/transfer.h b/src/kudu/rpc/transfer.h
index adf59b2a8..bcacff340 100644
--- a/src/kudu/rpc/transfer.h
+++ b/src/kudu/rpc/transfer.h
@@ -76,7 +76,7 @@ class InboundTransfer {
   // after this call returns OK), up to 4 extra bytes may have been read
   // from the socket and stored in 'extra_4'. In that case, any previous 
content of
   // 'extra_4' is replaced by this extra bytes.
-  Status ReceiveBuffer(Socket* socket, faststring* extra_4);
+  Status ReceiveBuffer(Socket* socket, faststring* extra_4, int64_t 
rpc_max_message_size);
 
   // Return true if any bytes have yet been sent.
   bool TransferStarted() const;
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index 186c71ebf..32522559f 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -10069,7 +10069,7 @@ TEST_P(DownloadSuperblockInBatchTest, 
TestDownloadSuperblockInBatch) {
                   "--rpc_max_message_size_enable_validation=false "
                   // Set --rpc_max_message_size very small, so it is easy for 
the size of
                   // superblock over --rpc_max_message_size. It is used to 
repeat the network
-                  // error, see line 9477.
+                  // error, see 'else' condition below.
                   "--rpc_max_message_size=$5 "
                   // This flag and --rpc_max_message_size are in a group flag 
validator, so
                   // it is also should be set a small value.

Reply via email to