This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 438216ad3 KUDU-3461 [client] Avoid impala crash by returning error if
invalid tablet id found
438216ad3 is described below
commit 438216ad33a7d096f276eef6cc52981c3b2fc2c9
Author: Ashwani Raina <[email protected]>
AuthorDate: Wed Jul 26 20:48:05 2023 +0530
KUDU-3461 [client] Avoid impala crash by returning error if invalid tablet
id found
Kudu C++ clients maintain per client level metacache. So, if one client is
issuing
insert ops on a partition and another client issues a 'cache invalidating
worth' DDL
op on the same partition, first client's cache won't get invalidated. In
some
workflows, this could potentially lead to an infinite recursion situation in
C++ client code that can eventually end up crashing impala daemon, due to
stack overflow.
The same situation can happen if it is a mix of C++ and Java clients as
long as
there is atleast one C++ client involved in the workflow.
The short-term fix is to avoid crash by detecting the invalid tablet id
condition and return error from kudu c++ client to impala daemon.
Following are the steps to reproduce the issue from impala-shell:
+++
1. drop table if exists impala_crash;
2. create table if not exists impala_crash \
( dt string, col string, primary key(dt) ) \
partition by range(dt) ( partition values <= '00000000' ) \
stored as kudu;
3. alter table impala_crash drop if exists range partition value='20230301';
4. alter table impala_crash add if not exists range partition
value='20230301';
5. insert into impala_crash values ('20230301','abc');
6. alter table impala_crash drop if exists range partition value='20230301';
7. alter table impala_crash add if not exists range partition
value='20230301';
8. insert into impala_crash values ('20230301','abc');
+++
The last statement i.e. #8 causes impalad (connected to impala-shell) to
crash
With this change, last statement query fails and throws
"Status::InvalidArgument()" error.
This change also includes unit test to test both scenarios:
1. Reproduce the infinite recursion case without a fix, expect it to crash
2. Reproduce the infinite recursion case with fix, expect it to return
"Status::InvalidArgument()" error instead of crashing due to stack
overflow.
Inserting the row again after last step should succeed as expected
as the stale cache entry for the tablet is cleared by now.
Change-Id: Ia09cf6fb1b1d10f1ad13a62b5c863bcd1e3ab26a
Reviewed-on: http://gerrit.cloudera.org:8080/20270
Reviewed-by: Alexey Serbin <[email protected]>
Tested-by: Kudu Jenkins
---
src/kudu/client/batcher.cc | 12 +++-
src/kudu/client/client-test.cc | 145 ++++++++++++++++++++++++++++++++++++++++-
src/kudu/client/meta_cache.cc | 87 +++++++++++++++++++------
src/kudu/client/meta_cache.h | 11 ++++
4 files changed, 233 insertions(+), 22 deletions(-)
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index cfde2eca3..8a3ad81c2 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -22,6 +22,7 @@
#include <mutex>
#include <ostream>
#include <string>
+#include <type_traits>
#include <unordered_map>
#include <utility>
#include <vector>
@@ -57,7 +58,7 @@
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/security/token.pb.h"
#include "kudu/tserver/tserver.pb.h"
-#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/tserver/tserver_service.proxy.h" // IWYU pragma: keep
#include "kudu/util/logging.h"
#include "kudu/util/pb_util.h"
@@ -501,6 +502,15 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status&
rpc_cb_status) {
result.status = StatusFromPB(resp_.error().status());
}
+ // In a multi-client usage scenario, where one client is used for DDL ops
+ // and other is used for DML ops on the same range partition, tablet
+ // entry in metacache may become invalid, rendering it useless for
+ // subsequent operation. This check lets client know about the same.
+ if (result.status.IsInvalidArgument()) {
+ result.result = RetriableRpcStatus::NON_RETRIABLE_ERROR;
+ return result;
+ }
+
// If we get TABLET_NOT_FOUND, the replica we thought was leader has been
deleted.
if (resp_.has_error() && resp_.error().code() ==
tserver::TabletServerErrorPB::TABLET_NOT_FOUND) {
result.result = RetriableRpcStatus::RESOURCE_NOT_FOUND;
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 600013465..2335afe8c 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -151,6 +151,7 @@ DECLARE_bool(master_support_auto_incrementing_column);
DECLARE_bool(master_support_connect_to_master_rpc);
DECLARE_bool(master_support_immutable_column_attribute);
DECLARE_bool(mock_table_metrics_for_testing);
+DECLARE_bool(prevent_kudu_3461_infinite_recursion);
DECLARE_bool(rpc_listen_on_unix_domain_socket);
DECLARE_bool(rpc_trace_negotiation);
DECLARE_bool(scanner_inject_service_unavailable_on_continue_scan);
@@ -184,12 +185,13 @@ DECLARE_int64(on_disk_size_for_testing);
DECLARE_string(location_mapping_cmd);
DECLARE_string(superuser_acl);
DECLARE_string(user_acl);
+DECLARE_uint32(default_deleted_table_reserve_seconds);
DECLARE_uint32(dns_resolver_cache_capacity_mb);
DECLARE_uint32(txn_keepalive_interval_ms);
DECLARE_uint32(txn_staleness_tracker_interval_ms);
DECLARE_uint32(txn_manager_status_table_num_replicas);
+
DEFINE_int32(test_scan_num_rows, 1000, "Number of rows to insert and scan");
-DECLARE_uint32(default_deleted_table_reserve_seconds);
METRIC_DECLARE_counter(block_manager_total_bytes_read);
METRIC_DECLARE_counter(location_mapping_cache_hits);
@@ -500,6 +502,30 @@ class ClientTest : public KuduTest {
NO_FATALS(CheckNoRpcOverflow());
}
+ // Inserts 'num_rows' test rows using 'client' and expect an invalid
argument error
+ void InsertTestRowsWithInvalidArgError(KuduClient* client,
+ KuduTable* table,
+ int num_rows,
+ int first_row = 0) const {
+ shared_ptr<KuduSession> session = client->NewSession();
+ ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+ NO_FATALS(InsertTestRows(table, session.get(), num_rows, first_row));
+
+ // Flush is expected to fail because of stale tablet entry in metacache
+ ASSERT_TRUE(session->Flush().IsIOError());
+
+ vector<KuduError*> errors;
+ ElementDeleter d(&errors);
+ bool overflow;
+ session->GetPendingErrors(&errors, &overflow);
+ ASSERT_FALSE(overflow);
+ ASSERT_EQ(errors.size(), num_rows);
+
+ // Check for only the first error.
+ ASSERT_TRUE(errors[0]->status().IsInvalidArgument());
+ ASSERT_STR_CONTAINS(errors[0]->status().ToString(), "Tablet id is not
valid anymore");
+ }
+
// Inserts 'num_rows' using the default client.
void InsertTestRows(KuduTable* table, int num_rows, int first_row = 0) const
{
InsertTestRows(client_.get(), table, num_rows, first_row);
@@ -10243,5 +10269,122 @@ TEST_F(ClientTestAutoIncrementingColumn,
CreateTableFeatureFlag) {
Substitute("Error creating table $0 on the master: cluster does not
support "
"CreateTable with feature(s) AUTO_INCREMENTING_COLUMN",
kTableName));
}
+
+class ClientTestMetacache : public ClientTest {
+ public:
+ // Helper method to reproduce the stack overflow scenario caused
+ // due to a bug in client metacache code where it ends up in infinite
+ // recursion loop:
+ // 1. Using object's existing client_ for DDL ops, start by creating
+ // a table with a range partition
+ // 2. Using a new DML client (client_insert_ops), insert a row
+ // 3. Using DDL client (client_), drop the partition
+ // 4. Using DDL client (client_), add a partition with same range
+ // 5. Using insert client (client_insert_ops), insert a row again
+ // 6. If fix (to avoid infinite recursion) is enabled:
+ // - #5 is expected to fail with "invalid tablet" error
+ // - Re-insert of row using same client is expected to pass
+ // as cache invalidation has happened in previous step.
+ // 7. If fix (to avoid infinite recursion) is disabled:
+ // - #5 is expected to cause a crash due to stack overflow
+ void TestClientMetacacheHelper(const string& table_name) {
+ // Create a partition boundary [0, 1)
+ unique_ptr<KuduPartialRow> lower_bound(schema_.NewRow());
+ ASSERT_OK(lower_bound->SetInt32("key", 0));
+ unique_ptr<KuduPartialRow> upper_bound(schema_.NewRow());
+ ASSERT_OK(upper_bound->SetInt32("key", 1));
+
+ // Create a table with above range partition using client_
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+
+ // Add a range partition
+ table_creator->add_range_partition(lower_bound.release(),
+ upper_bound.release());
+
+ // Create the table
+ ASSERT_OK(table_creator->table_name(table_name)
+ .schema(&schema_)
+ .num_replicas(1)
+ .set_range_partition_columns({ "key" })
+ .Create());
+
+ // Create a new client for update/insert ops
+ shared_ptr<KuduClient> client_insert_ops;
+ shared_ptr<KuduTable> table_insert_ops;
+ ASSERT_OK(cluster_->CreateClient(nullptr, &client_insert_ops));
+ ASSERT_OK(client_insert_ops->OpenTable(table_name, &table_insert_ops));
+
+ // Insert a row to the partition using client_insert_ops
+ NO_FATALS(InsertTestRows(client_insert_ops.get(), table_insert_ops.get(),
+ 1, 0));
+
+ // Drop the range partitions using the DDL client i.e. client_
+ unique_ptr<KuduTableAlterer> alterer(client_->NewTableAlterer(table_name));
+ lower_bound.reset(schema_.NewRow());
+ ASSERT_OK(lower_bound->SetInt32("key", 0));
+ upper_bound.reset(schema_.NewRow());
+ ASSERT_OK(upper_bound->SetInt32("key", 1));
+ alterer->DropRangePartition(lower_bound.release(), upper_bound.release());
+ ASSERT_OK(alterer->Alter());
+
+ // Add same range again using same DDL client i.e. client_
+ // This, essentially, adds a new partition with same range
+ // but with a different incarnation (a new generation, if you will).
+ // Any subsequent DML op is expected to consult the new metacache
+ // entry that represents the new tablet generation as the previous
+ // cache entry is not valid anymore.
+ alterer.reset(client_->NewTableAlterer(table_name));
+ lower_bound.reset(schema_.NewRow());
+ ASSERT_OK(lower_bound->SetInt32("key", 0));
+ upper_bound.reset(schema_.NewRow());
+ ASSERT_OK(upper_bound->SetInt32("key", 1));
+ alterer->AddRangePartition(lower_bound.release(), upper_bound.release());
+ ASSERT_OK(alterer->Alter());
+
+ if (FLAGS_prevent_kudu_3461_infinite_recursion) {
+ // Insert a row to the table using same insert client i.e.
client_insert_ops
+ // This is expected to fail with "Status::InvalidArgument()" error
+ NO_FATALS(InsertTestRowsWithInvalidArgError(client_insert_ops.get(),
+ table_insert_ops.get(),
+ 1, 0));
+
+ // Try inserting row again. This time it is expected to succeed
+ NO_FATALS(InsertTestRows(client_insert_ops.get(), table_insert_ops.get(),
+ 1, 0));
+ } else {
+ // Death tests use fork(), which is unsafe particularly in a threaded
+ // context. For this test, Google Test detected more than one threads.
See
+ //
https://github.com/google/googletest/blob/master/docs/advanced.md#death-tests-and-threads
+ // for more explanation. If not set to "threadsafe", the test gets stuck.
+ GTEST_FLAG_SET(death_test_style, "threadsafe");
+
+ // Insert a row to the table using same insert client i.e.
client_insert_ops
+ // This is expected to crash due to stack overflow
+ ASSERT_DEATH((InsertTestRows(client_insert_ops.get(),
+ table_insert_ops.get(),
+ 1, 0)), "");
+ }
+ }
+};
+
+// The purpose of this test is to verify case of infinite recursion
+// caused due to stale cache entry for a deleted tablet being re-created
+// via one client and populated with a row using a different client.
+// Refer KUDU-3461 for more details. The infinite recursion ends up in stack
+// overflow crash.
+TEST_F(ClientTestMetacache, TestClientMetacacheRecursion) {
+ FLAGS_prevent_kudu_3461_infinite_recursion = false;
+ TestClientMetacacheHelper(CURRENT_TEST_NAME());
+}
+
+// The purpose of this test is to verify fix for stack overflow by
+// avoiding infinite recursion and returning an error back to client
+// upon detection of stale cache entry for deleted tablet and cleanup
+// the stale client cache entry. Subsequent insert is expected to succeed.
+// Refer KUDU-3461 for more details.
+TEST_F(ClientTestMetacache, TestClientMetacacheInvalidation) {
+ FLAGS_prevent_kudu_3461_infinite_recursion = true;
+ TestClientMetacacheHelper(CURRENT_TEST_NAME());
+}
} // namespace client
} // namespace kudu
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 02cbe15df..232e1900b 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -101,6 +101,11 @@ DEFINE_int32(client_tablet_locations_by_id_ttl_ms, 60 * 60
* 1000, // 60 minutes
TAG_FLAG(client_tablet_locations_by_id_ttl_ms, advanced);
TAG_FLAG(client_tablet_locations_by_id_ttl_ms, runtime);
+DEFINE_bool(prevent_kudu_3461_infinite_recursion, true,
+ "Whether or not to prevent infinite recursion caused due to stale "
+ "client metacache as described in KUDU-3461. Used for testing
only!");
+TAG_FLAG(prevent_kudu_3461_infinite_recursion, unsafe);
+
namespace kudu {
namespace client {
namespace internal {
@@ -279,7 +284,7 @@ Status RemoteTablet::Refresh(
}
void RemoteTablet::MarkStale() {
- VLOG(1) << "Marking tablet stale, tablet id " << tablet_id_;
+ VLOG(2) << Substitute("Marking tablet stale, tablet id $0", tablet_id_);
stale_ = true;
}
@@ -462,12 +467,14 @@ void MetaCacheServerPicker::PickLeader(const
ServerPickedCallback& callback,
RemoteTabletServer* leader = nullptr;
if (!tablet_->stale()) {
leader = tablet_->LeaderTServer();
- if (leader) {
- VLOG(1) << "Client copy of tablet " << tablet_->tablet_id()
- << " is fresh, leader uuid " << leader->permanent_uuid();
- } else {
- VLOG(1) << "Client copy of tablet " << tablet_->tablet_id()
- << " is fresh (no leader).";
+ if (VLOG_IS_ON(2)) {
+ if (leader) {
+ VLOG(2) << Substitute("Client entry of tablet $0 is fresh, leader uuid
$1",
+ tablet_->tablet_id(), leader->permanent_uuid());
+ } else {
+ VLOG(2) << Substitute("Client entry of tablet $0 is fresh (no
leader).",
+ tablet_->tablet_id());
+ }
}
bool marked_as_follower = false;
@@ -529,7 +536,32 @@ void MetaCacheServerPicker::PickLeader(const
ServerPickedCallback& callback,
// shift the write to another tablet (i.e. if it's since been split).
if (!leader) {
if (table_) {
- VLOG(1) << "Table " << table_->name() << ": No valid leader, lookup
tablet by key.";
+ VLOG(2) << Substitute("Table $0: No valid leader, lookup tablet by key.",
+ table_->name());
+ if (PREDICT_TRUE(FLAGS_prevent_kudu_3461_infinite_recursion)) {
+ // First check metacache for the tablet
+ scoped_refptr<RemoteTablet> remote_tablet;
+ Status fastpath_status = meta_cache_->FastLookupTabletByKey(table_,
+
tablet_->partition().begin(),
+
MetaCache::LookupType::kPoint,
+
&remote_tablet);
+ if (!fastpath_status.IsIncomplete()) {
+ VLOG(2) << Substitute("Explicit fastpath lookup succeeded(maybe), "
+ "proceed with callback, table: $0",
+ table_->name());
+ if (remote_tablet &&
+ remote_tablet->tablet_id() != tablet_->tablet_id()) {
+ // Skip further processing if tablet in question has turned invalid
+ LOG(INFO) << Substitute("Tablet under process found to be invalid,
"
+ "table: $0 - old tabletid: $1, new
tabletid: $2",
+ table_->name(), tablet_->tablet_id(),
+ remote_tablet->tablet_id());
+ callback(Status::InvalidArgument("Tablet id is not valid anymore"),
+ nullptr);
+ return;
+ }
+ }
+ }
meta_cache_->LookupTabletByKey(
table_,
tablet_->partition().begin(),
@@ -587,6 +619,8 @@ void MetaCacheServerPicker::LookUpTabletCb(const
ServerPickedCallback& callback,
// If we couldn't lookup the tablet call the user callback immediately.
if (!status.ok()) {
+ VLOG(2) << Substitute("Tablet lookup failed, tablet id: $0, status: $1",
+ tablet_->tablet_id(), status.ToString());
callback(status, nullptr);
return;
}
@@ -929,6 +963,8 @@ void LookupRpc::SendRpcSlowPath() {
req_.set_replica_type_filter(master::ANY_REPLICA);
}
+ VLOG(2) << Substitute("Slowpathing RPC $0: refreshing our metadata from the
Master",
+ ToString());
// Actually send the request.
AsyncLeaderMasterRpc::SendRpc();
}
@@ -1225,22 +1261,22 @@ bool MetaCache::LookupEntryByKeyFastPath(const
KuduTable* table,
const TabletMap* tablets = FindOrNull(tablets_by_table_and_key_,
table->id());
if (PREDICT_FALSE(!tablets)) {
// No cache available for this table.
- VLOG(1) << "No cache available for table " << table->name();
+ VLOG(2) << Substitute("No cache available for table $0", table->name());
return false;
}
const MetaCacheEntry* e = FindFloorOrNull(*tablets, partition_key);
if (PREDICT_FALSE(!e)) {
// No tablets with a start partition key lower than 'partition_key'.
- VLOG(1) << "Table " << table->name()
- << ": No tablets found with a start key lower than input key.";
+ VLOG(2) << Substitute("Table $0: No tablets found with a start key lower
than input key.",
+ table->name());
return false;
}
// Stale entries must be re-fetched.
if (e->stale()) {
- VLOG(1) << "Table " << table->name() << ": Stale entry for tablet "
- << e->tablet()->tablet_id() << " found, must be re-fetched.";
+ VLOG(2) << Substitute("Table $0: Stale entry for tablet $1 found, must be
re-fetched.",
+ table->name(), e->tablet()->tablet_id());
return false;
}
@@ -1249,19 +1285,28 @@ bool MetaCache::LookupEntryByKeyFastPath(const
KuduTable* table,
return true;
}
- VLOG(1) << "Table " << table->name() << ": LookupEntryByKeyFastPath failed!";
+ VLOG(2) << Substitute("Table $0: Fastpath lookup by key failed!",
table->name());
return false;
}
+Status MetaCache::FastLookupTabletByKey(const KuduTable* table,
+ PartitionKey partition_key,
+ MetaCache::LookupType lookup_type,
+ scoped_refptr<RemoteTablet>*
remote_tablet) {
+ return DoFastPathLookup(table, &partition_key, lookup_type, remote_tablet);
+}
+
Status MetaCache::DoFastPathLookup(const KuduTable* table,
PartitionKey* partition_key,
MetaCache::LookupType lookup_type,
scoped_refptr<RemoteTablet>* remote_tablet)
{
+ static const string err_str = "No tablet covering the requested range
partition";
MetaCacheEntry entry;
while (PREDICT_TRUE(LookupEntryByKeyFastPath(table, *partition_key, &entry))
&& (entry.is_non_covered_range() || entry.tablet()->HasLeader())) {
- VLOG(4) << "Fast lookup: found " << entry.DebugString(table) << " for "
- << DebugLowerBoundPartitionKey(table, *partition_key);
+ VLOG(4) << Substitute("Fast lookup: found $0 for $1",
+ entry.DebugString(table),
+ DebugLowerBoundPartitionKey(table, *partition_key));
if (!entry.is_non_covered_range()) {
if (remote_tablet) {
*remote_tablet = entry.tablet();
@@ -1269,11 +1314,12 @@ Status MetaCache::DoFastPathLookup(const KuduTable*
table,
return Status::OK();
}
if (lookup_type == LookupType::kPoint ||
entry.upper_bound_partition_key().empty()) {
- return Status::NotFound("No tablet covering the requested range
partition",
- entry.DebugString(table));
+ VLOG(2) << Substitute(err_str);
+ return Status::NotFound(err_str, entry.DebugString(table));
}
*partition_key = entry.upper_bound_partition_key();
}
+ VLOG(2) << Substitute("Fastpath lookup failed with incomplete status");
return Status::Incomplete("");
}
@@ -1428,8 +1474,9 @@ void MetaCache::LookupTabletByKey(const KuduTable* table,
return;
}
- VLOG(1) << "Fastpath lookup failed with " << fastpath_status.ToString()
- << ". Proceed with rpc lookup, table: " << table->name();
+ VLOG(2) << Substitute("Fastpath lookup failed with $0. Proceed with RPC
lookup,"
+ " table: $1",
+ fastpath_status.ToString(), table->name());
LookupRpc* rpc = new LookupRpc(this,
callback,
table,
diff --git a/src/kudu/client/meta_cache.h b/src/kudu/client/meta_cache.h
index 090426674..baaec3add 100644
--- a/src/kudu/client/meta_cache.h
+++ b/src/kudu/client/meta_cache.h
@@ -437,6 +437,17 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> {
scoped_refptr<RemoteTablet>* remote_tablet,
const StatusCallback& callback);
+ // Do a fastpath look up of tablet that hosts the given partition key for a
+ // table. If available, the tablet is stored in 'remote_tablet' (if not
NULL).
+ // Returns:
+ // - NotFound if the lookup hits a non-covering range.
+ // - Incomplete if the fast path was not possible.
+ // - OK if the lookup was successful.
+ Status FastLookupTabletByKey(const KuduTable* table,
+ PartitionKey partition_key,
+ LookupType lookup_type,
+ scoped_refptr<RemoteTablet>* remote_tablet);
+
// Look up which tablet hosts the given partition key for a table.
// If @split_size_bytes set nonzero, send SplitKeyRangeRPC to remote tserver,
// otherwise search only occurs locally.