This is an automated email from the ASF dual-hosted git repository. zhangyifan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 43ba1013149b7811e4f77ae8bc8e65b1b5ea3d10 Author: shenxingwuying <[email protected]> AuthorDate: Tue Aug 9 20:20:46 2022 +0800 [client] Fix a kudu c++ client bug when using replica_selection policy At c++ client side, the replica_selection policy (LEADER_ONLY and CLOSEST_REPLICA) is not working. Eg command: 'kudu perf table_scan $master_list $table -columns=id,name -num_threads=4 -nofill_cache -replica_selection="LEADER"', but the real replica_selection policy is CLOSEST_REPLICA. The patch fixes the bug in client library and adds unit tests. Change-Id: I413f99b6a0b6082c5453358b8333913e4c6264c2 Reviewed-on: http://gerrit.cloudera.org:8080/18877 Reviewed-by: Yuqi Du <[email protected]> Tested-by: Kudu Jenkins Reviewed-by: Yifan Zhang <[email protected]> --- src/kudu/client/client.h | 1 + src/kudu/client/scan_token-internal.cc | 30 +++++++ src/kudu/client/scan_token-test.cc | 140 ++++++++++++++++++++++++++++++++- src/kudu/common/common.proto | 2 + src/kudu/tools/table_scanner.cc | 7 +- 5 files changed, 176 insertions(+), 4 deletions(-) diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index 82f393480..8db69404a 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -1063,6 +1063,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> { friend class tools::LeaderMasterProxy; friend class tools::RemoteKsckCluster; friend class tools::TableLister; + friend class ScanTokenTest; FRIEND_TEST(kudu::ClientStressTest, TestUniqueClientIds); FRIEND_TEST(kudu::MetaCacheLookupStressTest, PerfSynthetic); diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc index 91360e6a3..ff16460d0 100644 --- a/src/kudu/client/scan_token-internal.cc +++ b/src/kudu/client/scan_token-internal.cc @@ -237,6 +237,23 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client, configuration->AddConjunctPredicate(std::move(*predicate)); } + switch (message.replica_selection()) { + case kudu::ReplicaSelection::LEADER_ONLY: + RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::LEADER_ONLY), + ERROR, "set replica selection LEADER_ONLY failed"); + break; + case kudu::ReplicaSelection::CLOSEST_REPLICA: + RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::CLOSEST_REPLICA), + ERROR, "set replica selection CLOSEST_REPLICA failed"); + break; + case kudu::ReplicaSelection::FIRST_REPLICA: + RETURN_NOT_OK_LOG(configuration->SetSelection(KuduClient::ReplicaSelection::FIRST_REPLICA), + ERROR, "set replica selection FIRST_REPLICA failed"); + break; + default: + return Status::NotSupported("unsupported ReplicaSelection policy"); + } + #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wdeprecated-declarations" if (message.has_lower_bound_primary_key()) { @@ -394,6 +411,19 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) { ColumnPredicateToPB(predicate_pair.second, pb.add_column_predicates()); } + switch (configuration_.selection_) { + case KuduClient::ReplicaSelection::LEADER_ONLY: + pb.set_replica_selection(kudu::ReplicaSelection::LEADER_ONLY); + break; + case KuduClient::ReplicaSelection::CLOSEST_REPLICA: + pb.set_replica_selection(kudu::ReplicaSelection::CLOSEST_REPLICA); + break; + case KuduClient::ReplicaSelection::FIRST_REPLICA: + pb.set_replica_selection(kudu::ReplicaSelection::FIRST_REPLICA); + break; + default: + return Status::InvalidArgument("replica_selection is invalid."); + } const KuduScanner::ReadMode read_mode = configuration_.read_mode(); switch (read_mode) { case KuduScanner::READ_LATEST: diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc index fca2ccbf6..792faa654 100644 --- a/src/kudu/client/scan_token-test.cc +++ b/src/kudu/client/scan_token-test.cc @@ -18,9 +18,12 @@ #include <atomic> #include <cstddef> #include <cstdint> +#include <functional> +#include <map> #include <memory> #include <string> #include <thread> +#include <type_traits> #include <unordered_set> #include <utility> #include <vector> @@ -29,9 +32,11 @@ #include <glog/logging.h> #include <gtest/gtest.h> +#include "kudu/client/client-internal.h" #include "kudu/client/client-test-util.h" #include "kudu/client/client.h" #include "kudu/client/client.pb.h" +#include "kudu/client/meta_cache.h" #include "kudu/client/scan_batch.h" #include "kudu/client/scan_configuration.h" #include "kudu/client/scan_predicate.h" @@ -43,18 +48,22 @@ #include "kudu/common/common.pb.h" #include "kudu/common/partial_row.h" #include "kudu/common/wire_protocol.pb.h" +#include "kudu/gutil/map-util.h" #include "kudu/gutil/ref_counted.h" #include "kudu/gutil/stl_util.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/integration-tests/test_workload.h" #include "kudu/master/catalog_manager.h" #include "kudu/master/master.h" #include "kudu/master/mini_master.h" #include "kudu/mini-cluster/internal_mini_cluster.h" #include "kudu/tablet/tablet_replica.h" #include "kudu/tserver/mini_tablet_server.h" +#include "kudu/tserver/scanners.h" #include "kudu/tserver/tablet_server.h" #include "kudu/tserver/ts_tablet_manager.h" #include "kudu/tserver/tserver.pb.h" +#include "kudu/util/async_util.h" #include "kudu/util/metrics.h" #include "kudu/util/monotime.h" #include "kudu/util/net/sockaddr.h" @@ -76,6 +85,7 @@ using kudu::master::TabletInfo; using kudu::tablet::TabletReplica; using kudu::tserver::MiniTabletServer; using std::atomic; +using std::map; using std::string; using std::thread; using std::unique_ptr; @@ -85,6 +95,9 @@ using std::vector; namespace kudu { namespace client { +static constexpr const int32_t kRecordCount = 1000; +static constexpr const int32_t kBucketNum = 10; + class ScanTokenTest : public KuduTest { protected: void SetUp() override { @@ -135,13 +148,16 @@ class ScanTokenTest : public KuduTest { // Similar to CountRows() above, but use the specified client handle // and run all the scanners sequentially, one by one. - static Status CountRowsSeq(KuduClient* client, - const vector<KuduScanToken*>& tokens, - int64_t* row_count) { + static Status CountRowsSeq( + KuduClient* client, + const vector<KuduScanToken*>& tokens, + int64_t* row_count, + KuduClient::ReplicaSelection replica_selection = KuduClient::ReplicaSelection::LEADER_ONLY) { int64_t count = 0; for (auto* t : tokens) { unique_ptr<KuduScanner> scanner; RETURN_NOT_OK(IntoUniqueScanner(client, *t, &scanner)); + RETURN_NOT_OK(scanner->SetSelection(replica_selection)); RETURN_NOT_OK(scanner->Open()); while (scanner->HasMoreRows()) { KuduScanBatch batch; @@ -246,6 +262,79 @@ class ScanTokenTest : public KuduTest { return Status::OK(); } + void PrepareEnvForTestReplicaSelection(shared_ptr<KuduTable>* table, vector<string>* tablet_ids) { + constexpr const char* const kTableName = "replica_selection"; + // Set up the mini cluster + InternalMiniClusterOptions options; + options.num_tablet_servers = 3; + cluster_.reset(new InternalMiniCluster(env_, options)); + ASSERT_OK(cluster_->Start()); + constexpr int kReplicationFactor = 3; + + // Populate the table with data to scan later. + { + // Create a table with 10 partitions, 3 replication factor. + // and write some rows to make sure all partitions have data. + TestWorkload workload(cluster_.get(), TestWorkload::PartitioningType::HASH); + workload.set_table_name(kTableName); + workload.set_num_tablets(kBucketNum); + workload.set_num_replicas(kReplicationFactor); + workload.set_num_write_threads(10); + workload.set_write_batch_size(128); + workload.Setup(); + workload.Start(); + ASSERT_EVENTUALLY([&]() { ASSERT_GE(workload.rows_inserted(), kRecordCount); }); + workload.StopAndJoin(); + } + ASSERT_OK(cluster_->CreateClient(nullptr, &client_)); + ASSERT_OK(client_->OpenTable(kTableName, table)); + ASSERT_NE(nullptr, table->get()); + + vector<client::KuduScanToken*> tokens; + ElementDeleter deleter(&tokens); + client::KuduScanTokenBuilder builder(table->get()); + ASSERT_OK(builder.Build(&tokens)); + + tablet_ids->clear(); + tablet_ids->reserve(tokens.size()); + for (const auto* token : tokens) { + tablet_ids->emplace_back(token->tablet().id()); + } + } + + void GetSelectedReplicaCount(const vector<string>& tablet_ids, + KuduClient::ReplicaSelection replication_selection, + map<string, int32_t>* replica_num_by_ts_uuid) { + for (const auto& tablet_id : tablet_ids) { + scoped_refptr<internal::RemoteTablet> rt; + Synchronizer sync; + client_->data_->meta_cache_->LookupTabletById( + client_.get(), tablet_id, MonoTime::Max(), &rt, sync.AsStatusCallback()); + sync.Wait(); + vector<internal::RemoteTabletServer*> tservers; + rt->GetRemoteTabletServers(&tservers); + ASSERT_EQ(3, tservers.size()); + + vector<internal::RemoteTabletServer*> candidates; + internal::RemoteTabletServer* tserver_picked; + ASSERT_OK(client_->data_->GetTabletServer( + client_.get(), rt, replication_selection, {}, &candidates, &tserver_picked)); + auto& count = LookupOrInsert(replica_num_by_ts_uuid, tserver_picked->permanent_uuid(), 0); + count++; + } + } + + void GetScannerCount(map<string, int32_t>* scanner_count_by_ts_uuid) { + scanner_count_by_ts_uuid->clear(); + for (int i = 0; i < cluster_->num_tablet_servers(); i++) { + vector<tserver::ScanDescriptor> scanners = + cluster_->mini_tablet_server(i)->server()->scanner_manager()->ListScans(); + scanner_count_by_ts_uuid->insert( + {cluster_->mini_tablet_server(i)->server()->instance_pb().permanent_uuid(), + static_cast<int32_t>(scanners.size())}); + } + } + shared_ptr<KuduClient> client_; unique_ptr<InternalMiniCluster> cluster_; }; @@ -1475,5 +1564,50 @@ TEST_F(ScanTokenTest, ToggleFaultToleranceForScanConfiguration) { ASSERT_EQ(KuduScanner::READ_YOUR_WRITES, sc.read_mode()); } +class ReplicaSelectionTest : public ScanTokenTest, + public ::testing::WithParamInterface<KuduClient::ReplicaSelection> {}; + +INSTANTIATE_TEST_SUITE_P(PickServer, + ReplicaSelectionTest, + ::testing::Values(KuduClient::ReplicaSelection::LEADER_ONLY, + KuduClient::ReplicaSelection::CLOSEST_REPLICA, + KuduClient::ReplicaSelection::FIRST_REPLICA)); + +// TODO(duyuqi) +// Using location assignment to test replica selection for ScanToken, refer to: +// src/kudu/integration-tests/location_assignment-itest.cc#L76-L150 +// +// This unit test checks whether LEADER_ONLY/CLOSEST_REPLICA/FIRST_REPLICA replica selection works +// as expected. +TEST_P(ReplicaSelectionTest, ReplicaSelection) { + shared_ptr<KuduTable> table; + map<string, int32_t> replica_num_by_ts_uuid; + vector<string> tablet_ids; + auto replica_selection = GetParam(); + PrepareEnvForTestReplicaSelection(&table, &tablet_ids); + GetSelectedReplicaCount(tablet_ids, replica_selection, &replica_num_by_ts_uuid); + + map<string, int32_t> scanner_count_by_ts_uuid; + GetScannerCount(&scanner_count_by_ts_uuid); + vector<KuduScanToken*> tokens; + ElementDeleter deleter(&tokens); + // Scan all the partitions by specific replica selection. + // Launch scan requests. + ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens)); + int64_t row_count = 0; + CountRowsSeq(client_.get(), tokens, &row_count, replica_selection); + + int result = 0; + map<string, int32_t> now_scanner_count_by_ts_uuid; + GetScannerCount(&now_scanner_count_by_ts_uuid); + for (auto& ts_uuid_scanner_count : now_scanner_count_by_ts_uuid) { + const auto& permanent_uuid = ts_uuid_scanner_count.first; + ASSERT_EQ(replica_num_by_ts_uuid[permanent_uuid], + (ts_uuid_scanner_count.second - scanner_count_by_ts_uuid[permanent_uuid])); + result += replica_num_by_ts_uuid[permanent_uuid]; + } + ASSERT_EQ(kBucketNum, result); +} + } // namespace client } // namespace kudu diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto index 8117b65e4..2779d8566 100644 --- a/src/kudu/common/common.proto +++ b/src/kudu/common/common.proto @@ -312,6 +312,8 @@ enum ReplicaSelection { // - Replicas whose tablet server has the same location as the client // - All other replicas CLOSEST_REPLICA = 2; + // Select the first replica in the list. + FIRST_REPLICA = 3; } // The serialized format of a Kudu table partition schema. diff --git a/src/kudu/tools/table_scanner.cc b/src/kudu/tools/table_scanner.cc index 5ed43d0f6..614cce862 100644 --- a/src/kudu/tools/table_scanner.cc +++ b/src/kudu/tools/table_scanner.cc @@ -170,11 +170,14 @@ bool ValidateWriteType(const char* flag_name, } constexpr const char* const kReplicaSelectionClosest = "closest"; +constexpr const char* const kReplicaSelectionFirst = "first"; constexpr const char* const kReplicaSelectionLeader = "leader"; + bool ValidateReplicaSelection(const char* flag_name, const string& flag_value) { static const vector<string> kReplicaSelections = { kReplicaSelectionClosest, + kReplicaSelectionFirst, kReplicaSelectionLeader, }; return IsFlagValueAcceptable(flag_name, flag_value, kReplicaSelections); @@ -637,7 +640,7 @@ void TableScanner::SetReadMode(KuduScanner::ReadMode mode) { } Status TableScanner::SetReplicaSelection(const string& selection_str) { - KuduClient::ReplicaSelection selection; + KuduClient::ReplicaSelection selection = KuduClient::ReplicaSelection::CLOSEST_REPLICA; RETURN_NOT_OK(ParseReplicaSelection(selection_str, &selection)); replica_selection_ = selection; return Status::OK(); @@ -799,6 +802,8 @@ Status TableScanner::ParseReplicaSelection( *selection = KuduClient::ReplicaSelection::CLOSEST_REPLICA; } else if (iequals(kReplicaSelectionLeader, selection_str)) { *selection = KuduClient::ReplicaSelection::LEADER_ONLY; + } else if (iequals(kReplicaSelectionFirst, selection_str)) { + *selection = KuduClient::ReplicaSelection::FIRST_REPLICA; } else { return Status::InvalidArgument("invalid replica selection", selection_str); }
