This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit cef110b314f94aadfd3274932e710d05b7cec97c Author: kedeng <[email protected]> AuthorDate: Wed Sep 7 16:56:33 2022 +0800 KUDU-3393 C++ client support split a tablet to mutil ranges and concurrent scan data I add a param for build 'KuduScanToken', like this : ` KuduScanTokenBuilder builder(table); vector<KuduScanToken*> tokens; ElementDeleter deleter(&tokens); // set splitSizeBytes builder.SetSplitSizeBytes(1000); ASSERT_OK(builder.Build(&tokens)); ` The default value of split_size_bytes is 0, and this means we don't split the key range for a tablet. If the value of split_size_bytes is nonzero, we will try to send a SplitKeyRangeRPC to tservers. We may get more tokens than tablets num and the more tokens will help us scan faster. Change-Id: I207f9584cd558d32fcd9e8de7d6c25e517377272 Reviewed-on: http://gerrit.cloudera.org:8080/18945 Tested-by: Kudu Jenkins Reviewed-by: Yingchun Lai <[email protected]> --- src/kudu/client/client-test.cc | 229 +++++++++++++++++++++++++++++++++ src/kudu/client/client.cc | 4 + src/kudu/client/client.h | 5 + src/kudu/client/meta_cache.cc | 85 +++++++++++- src/kudu/client/meta_cache.h | 27 +++- src/kudu/client/scan_token-internal.cc | 70 ++++++---- src/kudu/client/scan_token-internal.h | 7 + 7 files changed, 402 insertions(+), 25 deletions(-) diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc index b7d5412ae..492a6e90e 100644 --- a/src/kudu/client/client-test.cc +++ b/src/kudu/client/client-test.cc @@ -141,6 +141,7 @@ DECLARE_bool(allow_unsafe_replication_factor); DECLARE_bool(catalog_manager_support_live_row_count); DECLARE_bool(catalog_manager_support_on_disk_size); DECLARE_bool(client_use_unix_domain_sockets); +DECLARE_bool(enable_rowset_compaction); DECLARE_bool(enable_txn_system_client_init); DECLARE_bool(fail_dns_resolution); DECLARE_bool(location_mapping_by_uuid); @@ -217,6 +218,7 @@ using kudu::client::sp::shared_ptr; using kudu::tablet::TabletReplica; using kudu::transactions::TxnTokenPB; using kudu::tserver::MiniTabletServer; +using std::atomic; using std::function; using std::map; using std::nullopt; @@ -400,6 +402,68 @@ class ClientTest : public KuduTest { } } + void CheckTokensInfo(const vector<KuduScanToken*>& tokens, + int replica_num = 1) { + for (const auto* t : tokens) { + const KuduTablet& tablet = t->tablet(); + ASSERT_EQ(replica_num, tablet.replicas().size()); + const KuduReplica* replica = tablet.replicas()[0]; + ASSERT_TRUE(replica->is_leader()); + const MiniTabletServer* ts = cluster_->mini_tablet_server(0); + ASSERT_EQ(ts->server()->instance_pb().permanent_uuid(), + replica->ts().uuid()); + ASSERT_EQ(ts->bound_rpc_addr().host(), replica->ts().hostname()); + ASSERT_EQ(ts->bound_rpc_addr().port(), replica->ts().port()); + + unique_ptr<KuduTablet> tablet_copy; + { + KuduTablet* ptr; + ASSERT_OK(client_->GetTablet(tablet.id(), &ptr)); + tablet_copy.reset(ptr); + } + ASSERT_EQ(tablet.id(), tablet_copy->id()); + ASSERT_EQ(1, tablet_copy->replicas().size()); + const KuduReplica* replica_copy = tablet_copy->replicas()[0]; + + ASSERT_EQ(replica->is_leader(), replica_copy->is_leader()); + ASSERT_EQ(replica->ts().uuid(), replica_copy->ts().uuid()); + ASSERT_EQ(replica->ts().hostname(), replica_copy->ts().hostname()); + ASSERT_EQ(replica->ts().port(), replica_copy->ts().port()); + } + } + + int CountRows(const vector<KuduScanToken*>& tokens) { + atomic<uint32_t> rows(0); + vector<thread> threads; + for (KuduScanToken* token : tokens) { + string buf; + CHECK_OK(token->Serialize(&buf)); + + threads.emplace_back([this, &rows] (const string& serialized_token) { + shared_ptr<KuduClient> client; + ASSERT_OK(cluster_->CreateClient(nullptr, &client)); + KuduScanner* scanner_ptr; + ASSERT_OK(KuduScanToken::DeserializeIntoScanner( + client.get(), serialized_token, &scanner_ptr)); + unique_ptr<KuduScanner> scanner(scanner_ptr); + ASSERT_OK(scanner->Open()); + + while (scanner->HasMoreRows()) { + KuduScanBatch batch; + ASSERT_OK(scanner->NextBatch(&batch)); + rows += batch.NumRows(); + } + scanner->Close(); + }, std::move(buf)); + } + + for (thread& thread : threads) { + thread.join(); + } + + return rows; + } + // Return the number of lookup-related RPCs which have been serviced by the master. int CountMasterLookupRPCs() const { auto ent = cluster_->mini_master()->master()->metric_entity(); @@ -8539,6 +8603,171 @@ TEST_F(ClientTest, NoTxnManager) { } } +class TableKeyRangeTest : public ClientTest { + public: + Status BuildSchema() override { + KuduSchemaBuilder b; + b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey(); + b.AddColumn("int_val")->Type(KuduColumnSchema::INT32)->NotNull(); + b.AddColumn("string_val")->Type(KuduColumnSchema::STRING)->Nullable(); + return b.Build(&schema_); + } + + void SetUp() override { + ClientTest::SetUp(); + + // Set a low flush threshold so we can scan a mix of flushed data in + // in-memory data. + FLAGS_flush_threshold_mb = 0; + FLAGS_flush_threshold_secs = 1; + + // Disable rowset compact to prevent DRSs being merged because they are too small. + FLAGS_enable_rowset_compaction = false; + + ASSERT_OK(CreateTable(kTableName, 1, {}, GeneratePartialRows(), &client_table_)); + } + + typedef vector<pair<unique_ptr<KuduPartialRow>, unique_ptr<KuduPartialRow>>> KuduPartialRowsVec; + KuduPartialRowsVec GeneratePartialRows() const { + KuduPartialRowsVec rows; + vector<int> keys = { 0, 250, 500, 750 }; + for (int i = 0; i < keys.size(); i++) { + unique_ptr<KuduPartialRow> lower_bound(schema_.NewRow()); + CHECK_OK(lower_bound->SetInt32("key", keys[i])); + unique_ptr<KuduPartialRow> upper_bound(schema_.NewRow()); + CHECK_OK(upper_bound->SetInt32("key", keys[i] + 250)); + rows.emplace_back(lower_bound.release(), upper_bound.release()); + } + + return rows; + } + + static void InsertTestRowsWithStrings(KuduTable* table, KuduSession* session, int num_rows) { + vector<int> keys = { 0, 250, 500, 750 }; + string str_val = "*"; + int diff_value = 120; // use to create discontinuous data in a tablet + for (int k = 0; k < keys.size(); k++) { + for (int i = keys[k]; i < keys[k] + num_rows; i++) { + unique_ptr<KuduInsert> insert(table->NewInsert()); + ASSERT_OK(insert->mutable_row()->SetInt32("key", i)); + ASSERT_OK(insert->mutable_row()->SetInt32("int_val", i * 2)); + ASSERT_OK(insert->mutable_row()->SetString("string_val", str_val)); + ASSERT_OK(session->Apply(insert.release())); + ASSERT_OK(session->Flush()); + } + for (int i = keys[k] + diff_value; i < keys[k] + diff_value + num_rows; i++) { + unique_ptr<KuduInsert> insert(table->NewInsert()); + ASSERT_OK(insert->mutable_row()->SetInt32("key", i)); + ASSERT_OK(insert->mutable_row()->SetInt32("int_val", i * 2)); + ASSERT_OK(insert->mutable_row()->SetString("string_val", str_val)); + ASSERT_OK(session->Apply(insert.release())); + ASSERT_OK(session->Flush()); + } + } + } + + protected: + static constexpr const char* const kTableName = "client-testrange"; + + shared_ptr<KuduTable> range_table_; +}; + +TEST_F(TableKeyRangeTest, TestGetTableKeyRange) { + client::sp::shared_ptr<KuduTable> table; + ASSERT_OK(client_->OpenTable(kTableName, &table)); + { + // Create session + shared_ptr<KuduSession> session = client_->NewSession(); + session->SetTimeoutMillis(10000); + ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH)); + + // Should have no rows to begin with. + ASSERT_EQ(0, CountRowsFromClient(table.get())); + // Insert rows + NO_FATALS(InsertTestRowsWithStrings(client_table_.get(), session.get(), 100)); + NO_FATALS(CheckNoRpcOverflow()); + } + + { + // search meta cache by default + // + // There are tablet information in the meta cache. + // We give priority to the data in the cache by default. + KuduScanTokenBuilder builder(table.get()); + vector<KuduScanToken*> tokens; + ElementDeleter deleter(&tokens); + ASSERT_OK(builder.Build(&tokens)); + ASSERT_EQ(4, tokens.size()); + + NO_FATALS(CheckTokensInfo(tokens)); + ASSERT_EQ(800, CountRows(tokens)); + } + + { + // search meta cache by local + // + // If the splitSizeBytes set to 0 , we search the meta cache. + KuduScanTokenBuilder builder(table.get()); + vector<KuduScanToken*> tokens; + ElementDeleter deleter(&tokens); + // set splitSizeBytes to 0 + builder.SetSplitSizeBytes(0); + ASSERT_OK(builder.Build(&tokens)); + ASSERT_EQ(4, tokens.size()); + + NO_FATALS(CheckTokensInfo(tokens)); + ASSERT_EQ(800, CountRows(tokens)); + } + + uint32_t token_size_a = 0; + { + // search from tserver + KuduScanTokenBuilder builder(table.get()); + vector<KuduScanToken*> tokens; + ElementDeleter deleter(&tokens); + // set splitSizeBytes < tablet's size + builder.SetSplitSizeBytes(700); + ASSERT_OK(builder.Build(&tokens)); + token_size_a = tokens.size(); + ASSERT_LT(4, token_size_a); + + NO_FATALS(CheckTokensInfo(tokens)); + ASSERT_EQ(800, CountRows(tokens)); + } + + uint32_t token_size_b = 0; + { + // search from tserver + KuduScanTokenBuilder builder(table.get()); + vector<KuduScanToken*> tokens; + ElementDeleter deleter(&tokens); + // set splitSizeBytes < tablet's size + builder.SetSplitSizeBytes(20); + ASSERT_OK(builder.Build(&tokens)); + token_size_b = tokens.size(); + ASSERT_LT(4, token_size_b); + + NO_FATALS(CheckTokensInfo(tokens)); + ASSERT_EQ(800, CountRows(tokens)); + } + + // diffferent splitSizeBytes leads to different token + ASSERT_NE(token_size_a, token_size_b); + + { + // search from tserver + KuduScanTokenBuilder builder(table.get()); + vector<KuduScanToken*> tokens; + ElementDeleter deleter(&tokens); + // set splitSizeBytes > tablet's size + builder.SetSplitSizeBytes(1024 * 1024 * 1024); + ASSERT_OK(builder.Build(&tokens)); + ASSERT_EQ(tokens.size(), 4); + + NO_FATALS(CheckTokensInfo(tokens)); + ASSERT_EQ(800, CountRows(tokens)); + } +} class ClientTxnManagerProxyTest : public ClientTest { public: void SetUp() override { diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc index f47f0ef0d..818bb27e3 100644 --- a/src/kudu/client/client.cc +++ b/src/kudu/client/client.cc @@ -2306,6 +2306,10 @@ Status KuduScanTokenBuilder::Build(vector<KuduScanToken*>* tokens) { return data_->Build(tokens); } +void KuduScanTokenBuilder::SetSplitSizeBytes(uint64_t split_size_bytes) { + return data_->SplitSizeBytes(split_size_bytes); +} + //////////////////////////////////////////////////////////// // KuduReplica //////////////////////////////////////////////////////////// diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index 834d77dce..82f393480 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -3358,6 +3358,11 @@ class KUDU_EXPORT KuduScanTokenBuilder { /// @return Operation result status. Status Build(std::vector<KuduScanToken*>* tokens) WARN_UNUSED_RESULT; + /// Set the size of the data in each key range. + /// The default value is 0 without set and tokens build by meta cache. + /// It's corresponding to 'setSplitSizeBytes' in Java client. + void SetSplitSizeBytes(uint64_t split_size_bytes); + private: class KUDU_NO_EXPORT Data; diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc index 948de02d9..f47b1fd6f 100644 --- a/src/kudu/client/meta_cache.cc +++ b/src/kudu/client/meta_cache.cc @@ -24,6 +24,7 @@ #include <ostream> #include <set> #include <string> +#include <type_traits> #include <utility> #include <vector> @@ -36,6 +37,7 @@ #include "kudu/client/master_proxy_rpc.h" #include "kudu/client/schema.h" #include "kudu/common/common.pb.h" +#include "kudu/common/key_range.h" #include "kudu/common/wire_protocol.h" #include "kudu/consensus/metadata.pb.h" #include "kudu/gutil/basictypes.h" @@ -44,11 +46,13 @@ #include "kudu/gutil/strings/substitute.h" #include "kudu/master/master.pb.h" #include "kudu/master/master.proxy.h" -#include "kudu/rpc/response_callback.h" #include "kudu/rpc/rpc.h" #include "kudu/rpc/rpc_controller.h" +#include "kudu/security/token.pb.h" +#include "kudu/tserver/tserver.pb.h" #include "kudu/tserver/tserver_admin.proxy.h" #include "kudu/tserver/tserver_service.proxy.h" +#include "kudu/util/async_util.h" #include "kudu/util/flag_tags.h" #include "kudu/util/logging.h" #include "kudu/util/net/dns_resolver.h" @@ -70,6 +74,8 @@ using kudu::master::TSInfoPB; using kudu::pb_util::SecureShortDebugString; using kudu::rpc::BackoffType; using kudu::rpc::CredentialsPolicy; +using kudu::rpc::RpcController; +using kudu::security::SignedTokenPB; using kudu::tserver::TabletServerAdminServiceProxy; using kudu::tserver::TabletServerServiceProxy; using std::set; @@ -1311,6 +1317,83 @@ void MetaCache::ClearCache() { entry_by_tablet_id_.clear(); } +Status MetaCache::GetTableKeyRanges(const KuduTable* table, + const PartitionKey& partition_key, + LookupType lookup_type, + uint64_t split_size_bytes, + const MonoDelta& timeout, + vector<RangeWithRemoteTablet>* range_tablets) { + scoped_refptr<internal::RemoteTablet> tablet; + Synchronizer sync; + MonoTime deadline = MonoTime::Now() + timeout; + LookupTabletByKey(table, + partition_key, + deadline, + lookup_type, + &tablet, + sync.AsStatusCallback()); + RETURN_NOT_OK(sync.Wait()); + + if (split_size_bytes == 0) { + KeyRange key_range( + tablet->partition().begin().ToString(), + tablet->partition().end().ToString(), + split_size_bytes); + range_tablets->emplace_back(key_range, tablet); + return Status::OK(); + } + DCHECK_GT(split_size_bytes, 0); + + RemoteTabletServer *ts; + vector<RemoteTabletServer*> candidates; + set<string> blacklist; + RETURN_NOT_OK(table->client()->data_->GetTabletServer(table->client(), + tablet, + KuduClient::LEADER_ONLY, + blacklist, + &candidates, + &ts)); + CHECK(ts); + CHECK(ts->proxy()); + auto proxy = ts->proxy(); + + tserver::SplitKeyRangeRequestPB req; + tserver::SplitKeyRangeResponsePB resp; + req.set_tablet_id(tablet->tablet_id()); + if (!tablet->partition().begin().ToString().empty()) { + req.set_start_primary_key(tablet->partition().begin().ToString()); + } + if (!tablet->partition().end().ToString().empty()) { + req.set_stop_primary_key(tablet->partition().end().ToString()); + } + req.set_target_chunk_size_bytes(split_size_bytes); + SignedTokenPB authz_token; + if (table->client()->data_->FetchCachedAuthzToken(table->id(), &authz_token)) { + *req.mutable_authz_token() = std::move(authz_token); + } else { + // Note: this is expected if attempting to connect to a cluster that does + // not support fine-grained access control. + VLOG(1) << "no authz token for table " << table->id(); + } + + RpcController rpc; + rpc.set_timeout(timeout); + RETURN_NOT_OK(proxy->SplitKeyRange(req, &resp, &rpc)); + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + + for (const auto& range : resp.ranges()) { + KeyRange key_range( + range.has_start_primary_key() ? range.start_primary_key() : "", + range.has_stop_primary_key() ? range.stop_primary_key() : "", + range.size_bytes_estimates()); + range_tablets->emplace_back(key_range, tablet); + } + + return Status::OK(); +} + void MetaCache::LookupTabletByKey(const KuduTable* table, PartitionKey partition_key, const MonoTime& deadline, diff --git a/src/kudu/client/meta_cache.h b/src/kudu/client/meta_cache.h index c93d2ba4b..090426674 100644 --- a/src/kudu/client/meta_cache.h +++ b/src/kudu/client/meta_cache.h @@ -18,6 +18,8 @@ // This module is internal to the client and not a public API. #pragma once +#include <cstdint> + #include <atomic> #include <map> #include <memory> @@ -32,6 +34,7 @@ #include <gtest/gtest_prod.h> #include "kudu/client/replica_controller-internal.h" +#include "kudu/common/key_range.h" #include "kudu/common/partition.h" #include "kudu/consensus/metadata.pb.h" #include "kudu/gutil/macros.h" @@ -406,6 +409,18 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> { kLowerBound }; + struct RangeWithRemoteTablet { + RangeWithRemoteTablet(KeyRange krange, + const scoped_refptr<internal::RemoteTablet>& rtablet) + : key_range(std::move(krange)), + remote_tablet(rtablet) {} + + KeyRange key_range; + scoped_refptr<internal::RemoteTablet> remote_tablet; + }; + + typedef std::map<PartitionKey, MetaCacheEntry> TabletMap; + // Look up which tablet hosts the given partition key for a table. When it is // available, the tablet is stored in 'remote_tablet' (if not NULL) and the // callback is fired. Only tablets with non-failed LEADERs are considered. @@ -422,6 +437,17 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> { scoped_refptr<RemoteTablet>* remote_tablet, const StatusCallback& callback); + // Look up which tablet hosts the given partition key for a table. + // If @split_size_bytes set nonzero, send SplitKeyRangeRPC to remote tserver, + // otherwise search only occurs locally. + // The result stored in 'range_tablets'. + Status GetTableKeyRanges(const KuduTable* table, + const PartitionKey& partition_key, + LookupType lookup_type, + uint64_t split_size_bytes, + const MonoDelta& timeout, + std::vector<RangeWithRemoteTablet>* range_tablets); + // Look up the locations of the given tablet, storing the result in // 'remote_tablet' if not null, and calling 'lookup_complete_cb' once the // lookup is complete. Only tablets with non-failed LEADERs are considered. @@ -551,7 +577,6 @@ class MetaCache : public RefCountedThreadSafe<MetaCache> { // for key-based lookups. // // Protected by lock_. - typedef std::map<PartitionKey, MetaCacheEntry> TabletMap; std::unordered_map<std::string, TabletMap> tablets_by_table_and_key_; // Cache entries for tablets, keyed by tablet ID, used for ID-based lookups. diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc index 434ce5803..91360e6a3 100644 --- a/src/kudu/client/scan_token-internal.cc +++ b/src/kudu/client/scan_token-internal.cc @@ -17,12 +17,12 @@ #include "kudu/client/scan_token-internal.h" -#include <cstdint> #include <map> #include <memory> #include <optional> #include <ostream> #include <string> +#include <type_traits> #include <unordered_map> #include <utility> #include <vector> @@ -40,8 +40,10 @@ #include "kudu/client/shared_ptr.h" // IWYU pragma: keep #include "kudu/client/tablet-internal.h" #include "kudu/client/tablet_server-internal.h" +#include "kudu/common/column_predicate.h" #include "kudu/common/common.pb.h" #include "kudu/common/encoded_key.h" +#include "kudu/common/key_range.h" #include "kudu/common/partition.h" #include "kudu/common/partition_pruner.h" #include "kudu/common/scan_spec.h" @@ -54,7 +56,6 @@ #include "kudu/gutil/strings/substitute.h" #include "kudu/master/master.pb.h" #include "kudu/security/token.pb.h" -#include "kudu/util/async_util.h" #include "kudu/util/monotime.h" #include "kudu/util/net/net_util.h" #include "kudu/util/slice.h" @@ -68,7 +69,6 @@ using strings::Substitute; namespace kudu { -class ColumnPredicate; using master::GetTableLocationsResponsePB; using master::TableIdentifierPB; using master::TabletLocationsPB; @@ -432,36 +432,51 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) { pb.set_batch_size_bytes(configuration_.batch_size_bytes()); } - MonoTime deadline = MonoTime::Now() + client->default_admin_operation_timeout(); - PartitionPruner pruner; + vector<MetaCache::RangeWithRemoteTablet> range_tablets; pruner.Init(*table->schema().schema_, table->partition_schema(), configuration_.spec()); while (pruner.HasMorePartitionKeyRanges()) { - scoped_refptr<internal::RemoteTablet> tablet; - Synchronizer sync; + PartitionKey key_range; + vector<MetaCache::RangeWithRemoteTablet> tmp_range_tablets; const auto& partition_key = pruner.NextPartitionKey(); - client->data_->meta_cache_->LookupTabletByKey(table, - partition_key, - deadline, - MetaCache::LookupType::kLowerBound, - &tablet, - sync.AsStatusCallback()); - const auto s = sync.Wait(); + Status s = client->data_->meta_cache_->GetTableKeyRanges( + table, + partition_key, + MetaCache::LookupType::kLowerBound, + split_size_bytes_, + client->default_rpc_timeout(), + &tmp_range_tablets); + if (s.IsNotFound()) { - // No more tablets in the table. pruner.RemovePartitionKeyRange({}); continue; } RETURN_NOT_OK(s); - // Check if the meta cache returned a tablet covering a partition key range past - // what we asked for. This can happen if the requested partition key falls - // in a non-covered range. In this case we can potentially prune the tablet. - if (partition_key < tablet->partition().begin() && - pruner.ShouldPrune(tablet->partition())) { - pruner.RemovePartitionKeyRange(tablet->partition().end()); - continue; + if (tmp_range_tablets.empty()) { + pruner.RemovePartitionKeyRange(partition_key); + } else { + // If split_size_bytes_ set to zero, we just do search in meta cache. + // Check if the meta cache returned a tablet covering a partition key range past + // what we asked for. This can happen if the requested partition key falls + // in a non-covered range. In this case we can potentially prune the tablet. + if (split_size_bytes_ == 0 && + partition_key < tmp_range_tablets.back().remote_tablet->partition().begin() && + pruner.ShouldPrune(tmp_range_tablets.back().remote_tablet->partition())) { + pruner.RemovePartitionKeyRange(tmp_range_tablets.back().remote_tablet->partition().end()); + continue; + } + for (auto& range_tablet : tmp_range_tablets) { + range_tablets.push_back(range_tablet); + } + pruner.RemovePartitionKeyRange(tmp_range_tablets.back().remote_tablet->partition().end()); } + } + + for (const auto& range_tablet : range_tablets) { + const auto& range = range_tablet.key_range; + const auto& tablet = range_tablet.remote_tablet; + vector<internal::RemoteReplica> replicas; tablet->GetRemoteReplicas(&replicas); @@ -499,6 +514,15 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) { message.CopyFrom(pb); message.set_lower_bound_partition_key(tablet->partition().begin().ToString()); message.set_upper_bound_partition_key(tablet->partition().end().ToString()); + if (!range.start_primary_key().empty() && split_size_bytes_) { + message.clear_lower_bound_primary_key(); + message.set_lower_bound_primary_key(range.start_primary_key()); + } + if (!range.stop_primary_key().empty() && split_size_bytes_) { + message.clear_upper_bound_primary_key(); + message.set_upper_bound_primary_key(range.stop_primary_key()); + } + // Set the tablet metadata so that a call to the master is not needed to // locate the tablet to scan when opening the scanner. @@ -557,8 +581,8 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) { std::move(message), std::move(client_tablet)); tokens->push_back(client_scan_token.release()); - pruner.RemovePartitionKeyRange(tablet->partition().end()); } + return Status::OK(); } diff --git a/src/kudu/client/scan_token-internal.h b/src/kudu/client/scan_token-internal.h index 6296da01e..97d464053 100644 --- a/src/kudu/client/scan_token-internal.h +++ b/src/kudu/client/scan_token-internal.h @@ -17,6 +17,8 @@ #pragma once +#include <cstdint> + #include <memory> #include <string> #include <vector> @@ -80,10 +82,15 @@ class KuduScanTokenBuilder::Data { include_tablet_metadata_ = include_metadata; } + void SplitSizeBytes(uint64_t split_size_bytes) { + split_size_bytes_ = split_size_bytes; + } + private: ScanConfiguration configuration_; bool include_table_metadata_ = true; bool include_tablet_metadata_ = true; + uint64_t split_size_bytes_ = 0; }; } // namespace client
