This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new c1759a99d KUDU-1261 introduce ARRAY_1D_COLUMN_TYPE feature for tservers
c1759a99d is described below
commit c1759a99d609d7485bb8d928fb58da7bbe7dca9f
Author: Alexey Serbin <[email protected]>
AuthorDate: Thu Oct 16 23:28:25 2025 -0700
KUDU-1261 introduce ARRAY_1D_COLUMN_TYPE feature for tservers
This changelist introduces a new feature flag ARRAY_1D_COLUMN_TYPE
for Kudu tablet servers to reflect whether a tablet server is capable
of hosting tablets with array columns. Also, the system catalog's code
has been updated to require the feature flag when sending requests
to create a table with array type columns or altering a table
that contains at least one array type column.
The rationale behind adding the new feature flag is to make it easier
to reason about and troubleshoot situations when a Kudu master of newer
version works with tablet servers of prior versions, and the latter ones
do not support array type columns.
This patch includes test scenarios to provide basic coverage
for the newly introduced functionality.
As a part of this changelist, I also updated the logic of the catalog
manager's RetryingTSRpcTask::RpcCallback() method to not retry sending
same RPC to tablet servers again and again if the tablet server responds
with 'unsupported feature flags'. This behavior is gated by the newly
introduced flag which is set to 'false' by default:
--catalog_manager_retry_tserver_task_on_unsupported_feature_flags
Change-Id: I07a8e6b6a90a90562e9ad0be35702f21b49d2da5
Reviewed-on: http://gerrit.cloudera.org:8080/23559
Reviewed-by: Abhishek Chennaka <[email protected]>
Tested-by: Alexey Serbin <[email protected]>
---
src/kudu/client/service-features-test.cc | 116 +++++++++++++++++++++++++++++++
src/kudu/master/catalog_manager.cc | 70 ++++++++++++++++---
src/kudu/tserver/tablet_server-test.cc | 37 ++++++++++
src/kudu/tserver/tablet_service.cc | 12 +++-
src/kudu/tserver/tserver.proto | 3 +
5 files changed, 227 insertions(+), 11 deletions(-)
diff --git a/src/kudu/client/service-features-test.cc
b/src/kudu/client/service-features-test.cc
index acb878cc2..5ff6d9ad9 100644
--- a/src/kudu/client/service-features-test.cc
+++ b/src/kudu/client/service-features-test.cc
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <functional>
#include <memory>
#include <string>
#include <type_traits>
@@ -29,11 +30,14 @@
#include "kudu/common/common.pb.h"
#include "kudu/common/schema.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/util/logging_test_util.h"
+#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_bool(master_support_1d_array_columns);
+DECLARE_bool(tserver_support_1d_array_columns);
using kudu::client::sp::shared_ptr;
using kudu::cluster::InternalMiniCluster;
@@ -146,5 +150,117 @@ TEST_F(ServiceFeaturesITest, ArrayColumnSupportMaster) {
}
}
+TEST_F(ServiceFeaturesITest, ArrayColumnSupportTabletServer) {
+ SKIP_IF_SLOW_NOT_ALLOWED();
+ constexpr const char* const kTableName = "array_columns_support_t";
+ constexpr const char* const kErrMsgPattern =
+ "Remote error: unsupported feature flags";
+
+ const Schema array_schema({
+ ColumnSchema("key", INT32),
+ ColumnSchemaBuilder().name("val").type(INT32).array(true).nullable(true)
+ }, 1);
+
+ // Make tablet server not declaring its ARRAY_1D_COLUMN_TYPE feature.
+ FLAGS_tserver_support_1d_array_columns = false;
+
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ KuduSchema schema(KuduSchema::FromSchema(array_schema));
+ table_creator->table_name(kTableName)
+ .schema(&schema)
+ .add_hash_partitions({ "key" }, 2)
+ .num_replicas(1)
+ .timeout(MonoDelta::FromSeconds(1));
+
+ {
+ StringVectorSink capture_logs;
+ {
+ // Capture the log messages during attempts to create tablets for the
new table.
+ ScopedRegisterSink reg(&capture_logs);
+ const auto s = table_creator->Create();
+ // Since the only tablet server doesn't support creating tablets with
schemas
+ // containing array columns, waiting for tablets' creation eventually
times out.
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Timed out waiting for Table
Creation");
+ }
+
+ bool pattern_found = false;
+ for (const string& s : capture_logs.logged_msgs()) {
+ if (string::npos != s.find("CreateTablet RPC failed for tablet") &&
+ string::npos != s.find(kErrMsgPattern)) {
+ pattern_found = true;
+ }
+ }
+ ASSERT_TRUE(pattern_found);
+ }
+
+ // Since the system catalog retries adding tablet replicas, re-enable the
+ // feature for the tablet server and wait until the system catalog succeeds
+ // in creating all the tablet replicas.
+ FLAGS_tserver_support_1d_array_columns = true;
+ ASSERT_EVENTUALLY([&] {
+ bool in_progress = true;
+ ASSERT_OK(client_->IsCreateTableInProgress(kTableName, &in_progress));
+ ASSERT_FALSE(in_progress);
+ });
+
+ // Disable the feature at the server side again.
+ FLAGS_tserver_support_1d_array_columns = false;
+ {
+ unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
+ alt->AddColumn("c0")->NestedType(
+ KuduColumnSchema::KuduNestedTypeDescriptor(
+
KuduColumnSchema::KuduArrayTypeDescriptor(KuduColumnSchema::INT8)));
+ alt->timeout(MonoDelta::FromSeconds(1));
+
+ StringVectorSink capture_logs;
+ {
+ // Capture the log messages during attempts to alter tablets for the new
table.
+ ScopedRegisterSink reg(&capture_logs);
+ const auto s = alt->Alter();
+ ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
+ ASSERT_STR_CONTAINS(s.ToString(), "Timed out waiting for AlterTable");
+ }
+
+ bool pattern_found = false;
+ for (const string& s : capture_logs.logged_msgs()) {
+ if (string::npos != s.find("AlterTable RPC failed for tablet") &&
+ string::npos != s.find(kErrMsgPattern)) {
+ pattern_found = true;
+ }
+ }
+ ASSERT_TRUE(pattern_found);
+ }
+
+ // It should be possible to drop the table with array columns, even if
+ // the tablet server doesn't support the ARRAY_1D_COLUMN_TYPE feature.
+ ASSERT_OK(client_->DeleteTable(kTableName));
+
+ {
+ // The tablet server doesn't need to support the new ARRAY_1D_COLUMN_TYPE
+ // feature if creating a new table without array columns.
+ const Schema table_schema({
+ ColumnSchema("key", INT32),
+ ColumnSchema("val", INT64),
+ }, 1);
+
+ unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+ KuduSchema schema(KuduSchema::FromSchema(table_schema));
+ table_creator->table_name(kTableName)
+ .schema(&schema)
+ .add_hash_partitions({ "key" }, 2)
+ .num_replicas(1);
+
+ ASSERT_OK(table_creator->Create());
+
+ // Adding a non-array column is possible as well.
+ unique_ptr<KuduTableAlterer> alt(client_->NewTableAlterer(kTableName));
+ alt->AddColumn("c0")->Type(KuduColumnSchema::INT8)->Nullable();
+ ASSERT_OK(alt->Alter());
+
+ ASSERT_OK(client_->DeleteTable(kTableName));
+ }
+}
+
} // namespace client
} // namespace kudu
diff --git a/src/kudu/master/catalog_manager.cc
b/src/kudu/master/catalog_manager.cc
index 85f1cb761..c651798fb 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -117,6 +117,7 @@
#include "kudu/rpc/remote_user.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_controller.h"
+#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/security/cert.h"
#include "kudu/security/crypto.h"
#include "kudu/security/tls_context.h"
@@ -322,6 +323,12 @@ DEFINE_bool(catalog_manager_support_live_row_count, true,
TAG_FLAG(catalog_manager_support_live_row_count, hidden);
TAG_FLAG(catalog_manager_support_live_row_count, runtime);
+DEFINE_bool(catalog_manager_retry_tserver_task_on_unsupported_feature_flags,
false,
+ "Whether to retry tasks that send asynchronous RPCs to tablet "
+ "servers if receiving unsupported feature flags error");
+TAG_FLAG(catalog_manager_retry_tserver_task_on_unsupported_feature_flags,
advanced);
+TAG_FLAG(catalog_manager_retry_tserver_task_on_unsupported_feature_flags,
runtime);
+
DEFINE_bool(catalog_manager_enable_chunked_tablet_reports, true,
"Whether to split the tablet report data received from one tablet "
"server into chunks when persisting it in the system catalog. "
@@ -492,6 +499,7 @@ using kudu::tablet::TabletDataState;
using kudu::tablet::TabletReplica;
using kudu::tablet::TabletStatePB;
using kudu::tserver::TabletServerErrorPB;
+using kudu::tserver::TabletServerFeatures;
using std::make_optional;
using std::nullopt;
using std::optional;
@@ -4684,15 +4692,31 @@ Status RetryingTSRpcTask::Run() {
}
void RetryingTSRpcTask::RpcCallback() {
- if (!rpc_.status().ok()) {
- KLOG_EVERY_N_SECS(WARNING, 1) << Substitute("TS $0: $1 RPC failed for
tablet $2: $3",
- target_ts_desc_->ToString(),
type_name(),
- tablet_id(),
rpc_.status().ToString());
- } else if (state() != kStateAborted) {
- HandleResponse(attempt_); // Modifies state_.
+ const auto& s = rpc_.status();
+ if (s.ok()) {
+ if (state() != kStateAborted) {
+ HandleResponse(attempt_); // Modifies state_.
+ }
+ } else {
+ DCHECK(!s.IsRemoteError() || (s.IsRemoteError() && rpc_.error_response()));
+ if (s.IsRemoteError() &&
+ rpc_.error_response() &&
+ rpc_.error_response()->unsupported_feature_flags_size() > 0 &&
+
!FLAGS_catalog_manager_retry_tserver_task_on_unsupported_feature_flags) {
+ // Once marked as failed, the task is unregistered below.
+ MarkFailed();
+ LOG(WARNING) << Substitute(
+ "TS $0: $1 RPC failed for tablet $2, no further retry: $3",
+ target_ts_desc_->ToString(), type_name(), tablet_id(), s.ToString());
+ } else {
+ KLOG_EVERY_N_SECS(WARNING, 1) << Substitute(
+ "TS $0: $1 RPC failed for tablet $2: $3",
+ target_ts_desc_->ToString(), type_name(), tablet_id(), s.ToString());
+ }
}
- // Schedule a retry if the RPC call was not successful.
+ // Schedule a retry if the RPC call was not successful and the task's overall
+ // status is still 'running'.
if (RescheduleWithBackoffDelay()) {
return;
}
@@ -4807,12 +4831,13 @@ class AsyncCreateReplica : public
RetrySpecificTSRpcTask {
public:
// The tablet lock must be acquired for reading before making this call.
- AsyncCreateReplica(Master *master,
+ AsyncCreateReplica(Master* master,
const string& permanent_uuid,
const scoped_refptr<TabletInfo>& tablet,
const TabletMetadataLock& tablet_lock)
- : RetrySpecificTSRpcTask(master, permanent_uuid, tablet->table().get()),
- tablet_id_(tablet->id()) {
+ : RetrySpecificTSRpcTask(master, permanent_uuid, tablet->table().get()),
+ tablet_id_(tablet->id()),
+ schema_has_nested_columns_(false) {
deadline_ = start_ts_ +
MonoDelta::FromMilliseconds(FLAGS_tablet_creation_timeout_ms);
TableMetadataLock table_lock(tablet->table().get(), LockMode::READ);
@@ -4830,6 +4855,15 @@ class AsyncCreateReplica : public RetrySpecificTSRpcTask
{
table_lock.data().pb.extra_config());
req_.set_dimension_label(tablet_lock.data().pb.dimension_label());
req_.set_table_type(table_lock.data().pb.table_type());
+
+ Schema schema;
+ const auto s = SchemaFromPB(req_.schema(), &schema);
+ if (PREDICT_TRUE(s.ok())) {
+ schema_has_nested_columns_ = schema.has_nested_columns();
+ } else {
+ KLOG_EVERY_N_SECS(WARNING, 60) << Substitute(
+ "could not convert table's $0 schema from PB: $1", req_.table_id(),
s.ToString());
+ }
}
string type_name() const override { return "CreateTablet"; }
@@ -4863,6 +4897,11 @@ class AsyncCreateReplica : public RetrySpecificTSRpcTask
{
VLOG(1) << Substitute("Sending $0 request to $1 (attempt $2): $3",
type_name(), target_ts_desc_->ToString(), attempt,
SecureDebugString(req_));
+ if (schema_has_nested_columns_) {
+ DCHECK(!ContainsKey(rpc_.required_server_features(),
+ TabletServerFeatures::ARRAY_1D_COLUMN_TYPE));
+ rpc_.RequireServerFeature(TabletServerFeatures::ARRAY_1D_COLUMN_TYPE);
+ }
ts_proxy_->CreateTabletAsync(req_, &resp_, &rpc_,
[this]() { this->RpcCallback(); });
return true;
@@ -4872,6 +4911,7 @@ class AsyncCreateReplica : public RetrySpecificTSRpcTask {
const string tablet_id_;
tserver::CreateTabletRequestPB req_;
tserver::CreateTabletResponsePB resp_;
+ bool schema_has_nested_columns_;
};
// Send a DeleteTablet() RPC request.
@@ -5051,6 +5091,16 @@ class AsyncAlterTable : public RetryingTSRpcTask {
l.Unlock();
+ Schema schema;
+ const auto s = SchemaFromPB(req.schema(), &schema);
+ if (PREDICT_TRUE(s.ok()) && schema.has_nested_columns()) {
+ DCHECK(!ContainsKey(rpc_.required_server_features(),
+ TabletServerFeatures::ARRAY_1D_COLUMN_TYPE));
+ rpc_.RequireServerFeature(TabletServerFeatures::ARRAY_1D_COLUMN_TYPE);
+ } else {
+ KLOG_EVERY_N_SECS(WARNING, 60) << Substitute(
+ "could not convert tablet's $0 schema from PB: $1", req.tablet_id(),
s.ToString());
+ }
VLOG(1) << Substitute("Sending $0 request to $1 (attempt $2): $3",
type_name(), target_ts_desc_->ToString(), attempt,
SecureDebugString(req));
diff --git a/src/kudu/tserver/tablet_server-test.cc
b/src/kudu/tserver/tablet_server-test.cc
index 267a1a55a..929aabba0 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -186,6 +186,7 @@ DECLARE_bool(fail_dns_resolution);
DECLARE_bool(rowset_metadata_store_keys);
DECLARE_bool(scanner_unregister_on_invalid_seq_id);
DECLARE_bool(show_slow_scans);
+DECLARE_bool(tserver_support_1d_array_columns);
DECLARE_double(cfile_inject_corruption);
DECLARE_double(env_inject_eio);
DECLARE_double(env_inject_full);
@@ -4174,6 +4175,42 @@ TEST_F(TabletServerTest, TestCreateTablet_TabletExists) {
}
}
+TEST_F(TabletServerTest, CreateTabletWithArrayColumnUnsupportedFeatureFlag) {
+ // Pretend the tablet server doesn't support the ARRAY_1D_COLUMN_TYPE
feature.
+ FLAGS_tserver_support_1d_array_columns = false;
+
+ CreateTabletRequestPB req;
+ req.set_dest_uuid(mini_server_->server()->fs_manager()->uuid());
+
+ Schema schema;
+ {
+ SchemaBuilder sb;
+ ASSERT_OK(sb.AddKeyColumn("key", INT32));
+ ASSERT_OK(sb.AddColumn(
+ ColumnSchemaBuilder().name("arr").type(INT8).array(true).Build()));
+ schema = sb.Build();
+ }
+ ASSERT_OK(SchemaToPB(schema, req.mutable_schema()));
+
+ RpcController rpc;
+ rpc.RequireServerFeature(TabletServerFeatures::ARRAY_1D_COLUMN_TYPE);
+ CreateTabletResponsePB resp;
+ const auto s = admin_proxy_->CreateTablet(req, &resp, &rpc);
+
+ ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
+ // It's not an application-level response, so 'resp' isn't populated.
+ ASSERT_FALSE(resp.has_error());
+ const auto* err = rpc.error_response();
+ ASSERT_NE(nullptr, err);
+ ASSERT_EQ(1, err->unsupported_feature_flags_size());
+ ASSERT_EQ(TabletServerFeatures::ARRAY_1D_COLUMN_TYPE,
+ err->unsupported_feature_flags(0));
+ ASSERT_TRUE(err->has_code());
+ ASSERT_EQ(rpc::ErrorStatusPB::ERROR_INVALID_REQUEST, err->code());
+ ASSERT_TRUE(err->has_message());
+ ASSERT_EQ("unsupported feature flags", err->message());
+}
+
TEST_F(TabletServerTest, TestDeleteTablet) {
scoped_refptr<TabletReplica> tablet;
diff --git a/src/kudu/tserver/tablet_service.cc
b/src/kudu/tserver/tablet_service.cc
index 07d00ccf9..6e949fed5 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -182,6 +182,12 @@ DEFINE_bool(tserver_txn_write_op_handling_enabled, true,
"in the context of multi-row transactions");
TAG_FLAG(tserver_txn_write_op_handling_enabled, hidden);
+DEFINE_bool(tserver_support_1d_array_columns, true,
+ "Whether the tablet server can host a tablet which schema contains
"
+ "one-dimensional array column");
+TAG_FLAG(tserver_support_1d_array_columns, hidden);
+TAG_FLAG(tserver_support_1d_array_columns, runtime);
+
DECLARE_bool(enable_txn_system_client_init);
DECLARE_bool(raft_prepare_replacement_before_eviction);
DECLARE_int32(memory_limit_warn_threshold_percentage);
@@ -1431,13 +1437,15 @@ void
TabletServiceAdminImpl::ParticipateInTransaction(const ParticipantRequestPB
}
bool TabletServiceAdminImpl::SupportsFeature(uint32_t feature) const {
+ // TODO(awong): once transactions are useable, add a feature flag.
switch (feature) {
case TabletServerFeatures::COLUMN_PREDICATES:
case TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES:
case TabletServerFeatures::QUIESCING:
case TabletServerFeatures::BLOOM_FILTER_PREDICATE_V2:
- // TODO(awong): once transactions are useable, add a feature flag.
return true;
+ case TabletServerFeatures::ARRAY_1D_COLUMN_TYPE:
+ return PREDICT_TRUE(FLAGS_tserver_support_1d_array_columns);
default:
return false;
}
@@ -2660,6 +2668,8 @@ bool TabletServiceImpl::SupportsFeature(uint32_t feature)
const {
case TabletServerFeatures::BLOOM_FILTER_PREDICATE_V2:
case TabletServerFeatures::COLUMNAR_LAYOUT_FEATURE:
return true;
+ case TabletServerFeatures::ARRAY_1D_COLUMN_TYPE:
+ return PREDICT_TRUE(FLAGS_tserver_support_1d_array_columns);
default:
return false;
}
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index abe16e69e..8041afd46 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -541,4 +541,7 @@ enum TabletServerFeatures {
// Update to implementation of Fast hash for Bloom filter predicate leads
// to incorrect results if incompatible client and server versions are used.
BLOOM_FILTER_PREDICATE_V2 = 6;
+ // Whether the server supports working with tables having one-dimensional
+ // array type columns.
+ ARRAY_1D_COLUMN_TYPE = 7;
}