This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new b4c3172 [master] KUDU-3036 reject DDLs which would lead to DoS
b4c3172 is described below
commit b4c317299f39ae0530cb10702e540cd22393656c
Author: Alexey Serbin <[email protected]>
AuthorDate: Mon Jan 13 17:09:39 2020 -0800
[master] KUDU-3036 reject DDLs which would lead to DoS
With this patch, masters reject DDL requests which would lead to DoS
situations described in KUDU-3036.
Added a test scenario to reproduce KUDU-3036. In the scenario, the size
of incoming AlterTable RPC is much less than the size of would-be-sent
UpdateConsensus RPC to update the system catalog correspondingly.
Change-Id: I7c212b635313a0aec5d9ebf8329319bae47f2dd2
Reviewed-on: http://gerrit.cloudera.org:8080/14999
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Kudu Jenkins
---
.../integration-tests/master_replication-itest.cc | 154 ++++++++++++++++++++-
src/kudu/master/sys_catalog-test.cc | 2 +-
src/kudu/master/sys_catalog.cc | 40 +++++-
src/kudu/master/sys_catalog.h | 3 +
src/kudu/rpc/serialization.cc | 2 +-
src/kudu/rpc/transfer.cc | 8 ++
6 files changed, 203 insertions(+), 6 deletions(-)
diff --git a/src/kudu/integration-tests/master_replication-itest.cc
b/src/kudu/integration-tests/master_replication-itest.cc
index b259e5a..a36f907 100644
--- a/src/kudu/integration-tests/master_replication-itest.cc
+++ b/src/kudu/integration-tests/master_replication-itest.cc
@@ -19,6 +19,7 @@
#include <memory>
#include <ostream>
#include <string>
+#include <utility>
#include <vector>
#include <gflags/gflags_declare.h>
@@ -29,20 +30,24 @@
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h"
#include "kudu/common/common.pb.h"
+#include "kudu/common/partial_row.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/replica_management.pb.h"
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master.proxy.h"
#include "kudu/master/mini_master.h"
+#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
+#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
@@ -54,17 +59,23 @@
DECLARE_bool(raft_prepare_replacement_before_eviction);
+METRIC_DECLARE_counter(sys_catalog_oversized_write_requests);
+
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduColumnSchema;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
+using kudu::client::KuduTableAlterer;
using kudu::client::KuduTableCreator;
using kudu::client::sp::shared_ptr;
-using kudu::consensus::ReplicaManagementInfoPB;
+using kudu::cluster::ExternalMiniCluster;
+using kudu::cluster::ExternalMiniClusterOptions;
using kudu::cluster::InternalMiniCluster;
using kudu::cluster::InternalMiniClusterOptions;
+using kudu::consensus::ReplicaManagementInfoPB;
using std::string;
+using std::unique_ptr;
using std::vector;
using strings::Substitute;
@@ -349,6 +360,147 @@ TEST_F(MasterReplicationTest,
TestConnectToFollowerMasterOnly) {
EXPECT_LE(successes, 1);
}
+// In this test, a Kudu master receives RPC under the maximum size limit,
+// however the corresponding update on the system tablet would be greater than.
+class MasterReplicationAndRpcSizeLimitTest : public KuduTest {
+ public:
+ void SetUp() override {
+ KuduTest::SetUp();
+ ASSERT_OK(Prepare());
+ }
+
+ protected:
+ static constexpr const char* const kKeyColumnName = "key";
+ static constexpr auto kNumMasters = 3;
+ static constexpr auto kNumTabletServers = 3;
+ static constexpr auto kReplicationFactor = 3;
+ // In case of standard builds, shorten the Raft hearbeat and election timeout
+ // intervals to speed up the test.
+#if defined(ADDRESS_SANITIZER) || defined(THREAD_SANITIZER)
+ static constexpr auto kHbIntervalMs = 500;
+#else
+ static constexpr auto kHbIntervalMs = 200;
+#endif
+ static constexpr auto kMaxMissedHbs = 2;
+
+ Status Prepare() {
+ const vector<string> ts_extra_flags = {
+ // Set custom timings for Raft heartbeats and heard-from-leader timeouts.
+ Substitute("--raft_heartbeat_interval_ms=$0", kHbIntervalMs),
+ Substitute("--leader_failure_max_missed_heartbeat_periods=$0",
kMaxMissedHbs),
+ // This test scenario creates many replicas per tablet server and causes
+ // multiple re-elections, so it's necessary to accommodate for spikes in
+ // Raft heartbeat traffic coming from one tablet server to another,
+ // especially in case of sanitizer builds.
+ "--rpc_service_queue_length=200",
+ };
+ const vector<string> master_extra_flags = {
+ // Set custom timings for Raft heartbeats and heard-from-leader timeouts.
+ Substitute("--raft_heartbeat_interval_ms=$0", kHbIntervalMs),
+ Substitute("--leader_failure_max_missed_heartbeat_periods=$0",
kMaxMissedHbs),
+ // Turn off the validator for the --rpc_max_message_size flag since this
+ // scenario uses non-conventional setting for the flag.
+ "--rpc_max_message_size_enable_validation=false",
+ // Set the maximum size for the master RPC to 64 KiByte.
+ Substitute("--rpc_max_message_size=$0", 64 * 1024),
+ // The updates on the system catalog tablet might be accumulated by Raft
+ // in various scenarios due to connectivity, leadership changes, etc.
+ // Substracting an extra 1K to account for extra fields while wrapping
+ // messages to replicate into UpdateConsensus RPC.
+ Substitute("--consensus_max_batch_size_bytes=$0", 63 * 1024),
+ };
+
+ ExternalMiniClusterOptions opts;
+ opts.num_masters = kNumMasters;
+ opts.num_tablet_servers = kNumTabletServers;
+ opts.extra_master_flags = master_extra_flags;
+ opts.extra_tserver_flags = ts_extra_flags;
+ cluster_.reset(new ExternalMiniCluster(std::move(opts)));
+ RETURN_NOT_OK(cluster_->Start());
+ return cluster_->CreateClient(nullptr, &client_);
+ }
+
+ // Create a table named 'table_name' with pre-defined structure.
+ Status CreateTable(const string& table_name, int replication_factor) {
+ // In this test scenario, long dimension labels are used to make
+ // the corresponding update on the system tablet longer than the incoming
+ // RPC to master (e.g. a tablet report or AlterTable request). In real
life,
+ // it's possible to achieve the same by other means, but it would be
+ // necessary to create many more tablet replicas in the cluster.
+ static const char* const kLabelSuffix =
+
"_very_looooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+
"oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+
"oooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo"
+
"ooooooooooooooooooooooooooooooooooooooooooooooooonooooog_label_suffix";
+ KuduSchemaBuilder b;
+ b.AddColumn("key")->Type(KuduColumnSchema::INT64)->NotNull()->PrimaryKey();
+ b.AddColumn("string_column")->Type(KuduColumnSchema::STRING);
+ RETURN_NOT_OK(b.Build(&schema_));
+
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ const auto s = table_creator->table_name(table_name)
+ .schema(&schema_)
+ .set_range_partition_columns({ kKeyColumnName })
+ .add_hash_partitions({ kKeyColumnName }, 10)
+ .num_replicas(replication_factor)
+ .dimension_label(table_name + kLabelSuffix)
+ .Create();
+ return s;
+ }
+
+ Status GetMetricValue(const MetricPrototype& metric_proto, int64_t* value) {
+ int leader_idx;
+ RETURN_NOT_OK(cluster_->GetLeaderMasterIndex(&leader_idx));
+ return itest::GetInt64Metric(
+ cluster_->master(leader_idx)->bound_http_hostport(),
+ &METRIC_ENTITY_server,
+ nullptr,
+ &metric_proto,
+ "value",
+ value);
+ }
+
+ unique_ptr<cluster::ExternalMiniCluster> cluster_;
+ client::sp::shared_ptr<client::KuduClient> client_;
+ KuduSchema schema_;
+};
+
+// Make sure leader master rejects AlterTable requests which result in updates
+// on the system tablet which it would not be able to push to its followers
+// due to the limit set by the --rpc_max_message_size flag.
+// This scenario simulates conditions described in KUDU-3036.
+TEST_F(MasterReplicationAndRpcSizeLimitTest, AlterTable) {
+ const string table_name = "table_to_alter";
+ ASSERT_OK(CreateTable(table_name, kReplicationFactor));
+
+ // After fresh start, there should be no rejected writes to the system
catalog
+ // tablet yet.
+ int64_t val;
+ ASSERT_OK(GetMetricValue(METRIC_sys_catalog_oversized_write_requests, &val));
+ ASSERT_EQ(0, val);
+
+ unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(table_name));
+ alterer->DropRangePartition(schema_.NewRow(), schema_.NewRow());
+ for (auto i = 0; i < 50; ++i) {
+ unique_ptr<KuduPartialRow> lower(schema_.NewRow());
+ unique_ptr<KuduPartialRow> upper(schema_.NewRow());
+ ASSERT_OK(lower->SetInt64(kKeyColumnName, 10 * i));
+ ASSERT_OK(upper->SetInt64(kKeyColumnName, 10 * (i + 1)));
+ alterer->AddRangePartition(lower.release(), upper.release());
+ }
+ auto s = alterer->timeout(MonoDelta::FromSeconds(30))->Alter();
+
+ // The DDL attempt above (i.e. the Alter() call) produces an oversided write
+ // request to the system catalog tablet. The request should have been
rejected
+ // and the corresponding metric incremented.
+ ASSERT_TRUE(s.IsInvalidArgument()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "too large for current setting of the "
+ "--rpc_max_message_size flag");
+ ASSERT_OK(GetMetricValue(METRIC_sys_catalog_oversized_write_requests, &val));
+ ASSERT_EQ(1, val);
+
+ NO_FATALS(cluster_->AssertNoCrashes());
+}
} // namespace master
} // namespace kudu
diff --git a/src/kudu/master/sys_catalog-test.cc
b/src/kudu/master/sys_catalog-test.cc
index 5f2f107..c67740c 100644
--- a/src/kudu/master/sys_catalog-test.cc
+++ b/src/kudu/master/sys_catalog-test.cc
@@ -408,7 +408,7 @@ TEST_F(SysCatalogTest, AttemptOverwriteCertAuthorityInfo) {
const Status s = master_->catalog_manager()->sys_catalog()->
AddCertAuthorityEntry(ca_entry);
ASSERT_TRUE(s.IsCorruption()) << s.ToString();
- ASSERT_EQ("Corruption: One or more rows failed to write", s.ToString());
+ ASSERT_EQ("Corruption: failed to write one or more rows", s.ToString());
}
} // namespace master
diff --git a/src/kudu/master/sys_catalog.cc b/src/kudu/master/sys_catalog.cc
index fb5e47a..e82f20f 100644
--- a/src/kudu/master/sys_catalog.cc
+++ b/src/kudu/master/sys_catalog.cc
@@ -94,6 +94,15 @@ DEFINE_double(sys_catalog_fail_during_write, 0.0,
"Fraction of the time when system table writes will fail");
TAG_FLAG(sys_catalog_fail_during_write, hidden);
+DECLARE_int64(rpc_max_message_size);
+
+METRIC_DEFINE_counter(server, sys_catalog_oversized_write_requests,
+ "System Catalog Oversized Write Requests",
+ kudu::MetricUnit::kRequests,
+ "Number of oversized write requests to the system "
+ "catalog tablet rejected since start",
+ kudu::MetricLevel::kWarn);
+
using kudu::consensus::ConsensusMetadata;
using kudu::consensus::ConsensusMetadataManager;
using kudu::consensus::ConsensusStatePB;
@@ -106,6 +115,7 @@ using kudu::tablet::Tablet;
using kudu::tablet::TabletReplica;
using kudu::tserver::WriteRequestPB;
using kudu::tserver::WriteResponsePB;
+using std::back_inserter;
using std::function;
using std::set;
using std::shared_ptr;
@@ -124,6 +134,20 @@ class Message;
namespace kudu {
namespace master {
+namespace {
+
+// An utility function to get the upper limit for the size of a write request
+// into the system tablet represented by WriteRequestPB. The write request is
+// eventually wrapped into ConsensusRequestPB when a leader master propagates
+// the updates to the followers. Wrapping WriteRequestPB into
ConsensusRequestPB
+// requires adding extra fields, and 1KB looks like a reasonable estimate
+// for the upper limit of the added delta.
+size_t GetMaxWriteRequestSize() {
+ return std::max<int64_t>(0, FLAGS_rpc_max_message_size - 1024);
+}
+
+} // anonymous namespace
+
static const char* const kSysCatalogTableColType = "entry_type";
static const char* const kSysCatalogTableColId = "entry_id";
static const char* const kSysCatalogTableColMetadata = "metadata";
@@ -162,6 +186,8 @@ SysCatalogTable::SysCatalogTable(Master* master,
master_(master),
cmeta_manager_(new ConsensusMetadataManager(master_->fs_manager())),
leader_cb_(std::move(leader_cb)) {
+ oversized_write_requests_ = master_->metric_entity()->FindOrCreateCounter(
+ &METRIC_sys_catalog_oversized_write_requests);
}
SysCatalogTable::~SysCatalogTable() {
@@ -216,7 +242,7 @@ Status SysCatalogTable::Load(FsManager *fs_manager) {
peer_addrs_from_opts.end(),
peer_addrs_from_disk.begin(),
peer_addrs_from_disk.end(),
- std::back_inserter(symm_diff));
+ back_inserter(symm_diff));
if (!symm_diff.empty()) {
string msg = Substitute(
"on-disk master list ($0) and provided master list ($1) differ. "
@@ -451,9 +477,17 @@ Status SysCatalogTable::WaitUntilRunning() {
}
Status SysCatalogTable::SyncWrite(const WriteRequestPB& req) {
+ DCHECK(req.has_tablet_id());
+ DCHECK(req.has_schema());
MAYBE_RETURN_FAILURE(FLAGS_sys_catalog_fail_during_write,
Status::RuntimeError(kInjectedFailureStatusMsg));
-
+ const size_t request_size = req.ByteSizeLong();
+ if (request_size > GetMaxWriteRequestSize()) {
+ oversized_write_requests_->Increment();
+ return Status::InvalidArgument(
+ Substitute("write request ($0 bytes in size) is too large for current "
+ "setting of the --rpc_max_message_size flag",
request_size));
+ }
CountDownLatch latch(1);
WriteResponsePB resp;
gscoped_ptr<tablet::TransactionCompletionCallback> txn_callback(
@@ -476,7 +510,7 @@ Status SysCatalogTable::SyncWrite(const WriteRequestPB&
req) {
LOG(WARNING) << Substitute(
"row $0: $1", error.row_index(),
StatusFromPB(error.error()).ToString());
}
- return Status::Corruption("One or more rows failed to write");
+ return Status::Corruption("failed to write one or more rows");
}
return Status::OK();
}
diff --git a/src/kudu/master/sys_catalog.h b/src/kudu/master/sys_catalog.h
index b213ad3..54b5fd6 100644
--- a/src/kudu/master/sys_catalog.h
+++ b/src/kudu/master/sys_catalog.h
@@ -38,6 +38,7 @@
namespace kudu {
+class Counter;
class FsManager;
class MetricRegistry;
class RowBlockRow;
@@ -323,6 +324,8 @@ class SysCatalogTable {
ElectedLeaderCallback leader_cb_;
consensus::RaftPeerPB local_peer_pb_;
+
+ scoped_refptr<Counter> oversized_write_requests_;
};
} // namespace master
diff --git a/src/kudu/rpc/serialization.cc b/src/kudu/rpc/serialization.cc
index 473a817..78ea295 100644
--- a/src/kudu/rpc/serialization.cc
+++ b/src/kudu/rpc/serialization.cc
@@ -69,7 +69,7 @@ void SerializeMessage(const MessageLite& message, faststring*
param_buf,
// this is a safe limitation.
CHECK_LE(total_size, std::numeric_limits<uint32_t>::max());
- if (total_size > FLAGS_rpc_max_message_size) {
+ if (PREDICT_FALSE(total_size > FLAGS_rpc_max_message_size)) {
LOG(WARNING) << Substitute("Serialized $0 ($1 bytes) is larger than the
maximum configured "
"RPC message size ($2 bytes). "
"Sending anyway, but peer may reject the data.",
diff --git a/src/kudu/rpc/transfer.cc b/src/kudu/rpc/transfer.cc
index 8f84011..b268e77 100644
--- a/src/kudu/rpc/transfer.cc
+++ b/src/kudu/rpc/transfer.cc
@@ -36,6 +36,11 @@
#include "kudu/util/logging.h"
#include "kudu/util/net/socket.h"
+DEFINE_bool(rpc_max_message_size_enable_validation, true,
+ "Whether to turn off validation for --rpc_max_message_size flag. "
+ "This is a test-only flag.");
+TAG_FLAG(rpc_max_message_size_enable_validation, unsafe);
+
DEFINE_int64(rpc_max_message_size, (50 * 1024 * 1024),
"The maximum size of a message that any RPC that the server will
accept. "
"Must be at least 1MB.");
@@ -43,6 +48,9 @@ TAG_FLAG(rpc_max_message_size, advanced);
TAG_FLAG(rpc_max_message_size, runtime);
static bool ValidateMaxMessageSize(const char* flagname, int64_t value) {
+ if (!FLAGS_rpc_max_message_size_enable_validation) {
+ return true;
+ }
if (value < 1 * 1024 * 1024) {
LOG(ERROR) << flagname << " must be at least 1MB.";
return false;