KUDU-2437 Split a tablet into primary key ranges by size When reading data in a kudu table using spark, if there is a large amount of data in the tablet, reading the data takes a long time. The reason is that KuduRDD uses a tablet to generate the scanToken, so a spark task needs to process all the data in a tablet.
TabletServer should provide an RPC interface, which can be split tablet into multiple primary key ranges by size. The kudu-client can choose whether to perform parallel scan according to the case. Change-Id: I9ec4395919f4b54102e458ef5154334c08412e8a Reviewed-on: http://gerrit.cloudera.org:8080/10406 Tested-by: Kudu Jenkins Reviewed-by: Adar Dembo <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d4ded71b Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d4ded71b Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d4ded71b Branch: refs/heads/master Commit: d4ded71bc0edadcbe2564d5677d319f35e48dad8 Parents: c2054ae Author: oclarms <[email protected]> Authored: Wed Jun 20 16:25:20 2018 +0800 Committer: Adar Dembo <[email protected]> Committed: Tue Sep 18 17:03:10 2018 +0000 ---------------------------------------------------------------------- src/kudu/common/CMakeLists.txt | 1 + src/kudu/common/common.proto | 10 ++ src/kudu/common/key_range.cc | 34 ++++ src/kudu/common/key_range.h | 59 ++++++ src/kudu/tablet/cfile_set.cc | 4 + src/kudu/tablet/cfile_set.h | 3 + src/kudu/tablet/diskrowset.cc | 9 + src/kudu/tablet/diskrowset.h | 2 + src/kudu/tablet/memrowset.h | 4 + src/kudu/tablet/mock-rowsets.h | 14 +- src/kudu/tablet/rowset.cc | 8 + src/kudu/tablet/rowset.h | 7 + src/kudu/tablet/rowset_info.cc | 105 +++++++++++ src/kudu/tablet/rowset_info.h | 19 ++ src/kudu/tablet/tablet-harness.h | 4 + src/kudu/tablet/tablet-test-util.h | 4 + src/kudu/tablet/tablet-test.cc | 266 +++++++++++++++++++++++++++- src/kudu/tablet/tablet.cc | 22 +++ src/kudu/tablet/tablet.h | 19 ++ src/kudu/tserver/tablet_service.cc | 108 +++++++++++ src/kudu/tserver/tablet_service.h | 4 + src/kudu/tserver/tserver.proto | 28 +++ src/kudu/tserver/tserver_service.proto | 3 + 23 files changed, 734 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/common/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/common/CMakeLists.txt b/src/kudu/common/CMakeLists.txt index 429a2b1..5d29a00 100644 --- a/src/kudu/common/CMakeLists.txt +++ b/src/kudu/common/CMakeLists.txt @@ -46,6 +46,7 @@ set(COMMON_SRCS id_mapping.cc iterator_stats.cc key_encoder.cc + key_range.cc key_util.cc partial_row.cc partition.cc http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/common/common.proto ---------------------------------------------------------------------- diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto index d911113..b3d323e 100644 --- a/src/kudu/common/common.proto +++ b/src/kudu/common/common.proto @@ -391,3 +391,13 @@ message ColumnPredicatePB { IsNull is_null = 6; } } + +// The primary key range of a Kudu tablet. +message KeyRangePB { + // Encoded primary key to begin scanning at (inclusive). + optional bytes start_primary_key = 1 [(kudu.REDACT) = true]; + // Encoded primary key to stop scanning at (exclusive). + optional bytes stop_primary_key = 2 [(kudu.REDACT) = true]; + // Number of bytes in chunk. + required uint64 size_bytes_estimates = 3; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/common/key_range.cc ---------------------------------------------------------------------- diff --git a/src/kudu/common/key_range.cc b/src/kudu/common/key_range.cc new file mode 100644 index 0000000..d597cc3 --- /dev/null +++ b/src/kudu/common/key_range.cc @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/common/key_range.h" + +#include "kudu/common/common.pb.h" + +namespace kudu { + +void KeyRange::ToPB(KeyRangePB* pb) const { + if (!start_key_.empty()) { + pb->set_start_primary_key(start_key_); + } + if (!stop_key_.empty()) { + pb->set_stop_primary_key(stop_key_); + } + pb->set_size_bytes_estimates(size_bytes_); +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/common/key_range.h ---------------------------------------------------------------------- diff --git a/src/kudu/common/key_range.h b/src/kudu/common/key_range.h new file mode 100644 index 0000000..6e555ea --- /dev/null +++ b/src/kudu/common/key_range.h @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <cstdint> +#include <string> +#include <utility> + +namespace kudu { + +class KeyRangePB; + +// A KeyRange describes the range in Tablet. +class KeyRange { + public: + KeyRange(std::string start_key, + std::string stop_key, + uint64_t size_bytes) + : start_key_(std::move(start_key)), + stop_key_(std::move(stop_key)), + size_bytes_(size_bytes) { + } + + // Serializes a KeyRange into a protobuf message. + void ToPB(KeyRangePB* pb) const; + + const std::string& start_primary_key() const { + return start_key_; + } + + const std::string& stop_primary_key() const { + return stop_key_; + } + + const uint64_t size_bytes() const { + return size_bytes_; + } + + private: + std::string start_key_; + std::string stop_key_; + uint64_t size_bytes_; +}; + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/cfile_set.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/cfile_set.cc b/src/kudu/tablet/cfile_set.cc index 3d16b39..5f07581 100644 --- a/src/kudu/tablet/cfile_set.cc +++ b/src/kudu/tablet/cfile_set.cc @@ -263,6 +263,10 @@ uint64_t CFileSet::OnDiskDataSize() const { return ret; } +uint64_t CFileSet::OnDiskColumnDataSize(const ColumnId& col_id) const { + return FindOrDie(readers_by_col_id_, col_id)->file_size(); +} + Status CFileSet::FindRow(const RowSetKeyProbe &probe, const IOContext* io_context, boost::optional<rowid_t>* idx, http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/cfile_set.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/cfile_set.h b/src/kudu/tablet/cfile_set.h index 89743d9..54a854b 100644 --- a/src/kudu/tablet/cfile_set.h +++ b/src/kudu/tablet/cfile_set.h @@ -102,6 +102,9 @@ class CFileSet : public std::enable_shared_from_this<CFileSet> { // Excludes the ad hoc index and bloomfiles. uint64_t OnDiskDataSize() const; + // The size on-disk of column cfile's data, in bytes. + uint64_t OnDiskColumnDataSize(const ColumnId& col_id) const; + // Determine the index of the given row key. // Sets *idx to boost::none if the row is not found. Status FindRow(const RowSetKeyProbe& probe, http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/diskrowset.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc index 25c366b..6640822 100644 --- a/src/kudu/tablet/diskrowset.cc +++ b/src/kudu/tablet/diskrowset.cc @@ -783,6 +783,15 @@ uint64_t DiskRowSet::OnDiskBaseDataSize() const { return drss.base_data_size; } +uint64_t DiskRowSet::OnDiskBaseDataColumnSize(const ColumnId& col_id) const { + DCHECK(open_); + shared_lock<rw_spinlock> l(component_lock_); + if (base_data_->has_data_for_column_id(col_id)) { + return base_data_->OnDiskColumnDataSize(col_id); + } + return 0; +} + uint64_t DiskRowSet::OnDiskBaseDataSizeWithRedos() const { DiskRowSetSpace drss; GetDiskRowSetSpaceUsage(&drss); http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/diskrowset.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h index 977cc78..e83f81c 100644 --- a/src/kudu/tablet/diskrowset.h +++ b/src/kudu/tablet/diskrowset.h @@ -377,6 +377,8 @@ class DiskRowSet : public RowSet { uint64_t OnDiskBaseDataSize() const override; + uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const override; + uint64_t OnDiskBaseDataSizeWithRedos() const override; size_t DeltaMemStoreSize() const override; http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/memrowset.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h index 8b2998a..dc382d8 100644 --- a/src/kudu/tablet/memrowset.h +++ b/src/kudu/tablet/memrowset.h @@ -274,6 +274,10 @@ class MemRowSet : public RowSet, return 0; } + uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const override { + return 0; + } + std::mutex *compact_flush_lock() override { return &compact_flush_lock_; } http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/mock-rowsets.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h index 4a65dfb..49d9d2e 100644 --- a/src/kudu/tablet/mock-rowsets.h +++ b/src/kudu/tablet/mock-rowsets.h @@ -90,6 +90,10 @@ class MockRowSet : public RowSet { LOG(FATAL) << "Unimplemented"; return 0; } + virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const OVERRIDE { + LOG(FATAL) << "Unimplemented"; + return 0; + } virtual uint64_t OnDiskBaseDataSizeWithRedos() const OVERRIDE { LOG(FATAL) << "Unimplemented"; return 0; @@ -172,10 +176,11 @@ class MockRowSet : public RowSet { class MockDiskRowSet : public MockRowSet { public: MockDiskRowSet(std::string first_key, std::string last_key, - uint64_t size = 1000000) + uint64_t size = 1000000, uint64_t column_size = 200) : first_key_(std::move(first_key)), last_key_(std::move(last_key)), - size_(size) {} + size_(size), + column_size_(column_size) {} virtual Status GetBounds(std::string* min_encoded_key, std::string* max_encoded_key) const OVERRIDE { @@ -192,6 +197,10 @@ class MockDiskRowSet : public MockRowSet { return size_; } + virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const OVERRIDE { + return column_size_; + } + virtual uint64_t OnDiskBaseDataSizeWithRedos() const OVERRIDE { return size_; } @@ -206,6 +215,7 @@ class MockDiskRowSet : public MockRowSet { const std::string first_key_; const std::string last_key_; const uint64_t size_; + const uint64_t column_size_; }; // Mock which acts like a MemRowSet and has no known bounds. http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/rowset.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/rowset.cc b/src/kudu/tablet/rowset.cc index 0dec136..a6dadb5 100644 --- a/src/kudu/tablet/rowset.cc +++ b/src/kudu/tablet/rowset.cc @@ -236,6 +236,14 @@ uint64_t DuplicatingRowSet::OnDiskBaseDataSize() const { return size; } +uint64_t DuplicatingRowSet::OnDiskBaseDataColumnSize(const ColumnId& col_id) const { + uint64_t size = 0; + for (const shared_ptr<RowSet> &rs : new_rowsets_) { + size += rs->OnDiskBaseDataColumnSize(col_id); + } + return size; +} + uint64_t DuplicatingRowSet::OnDiskBaseDataSizeWithRedos() const { // The actual value of this doesn't matter, since it won't be selected // for compaction. http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/rowset.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h index 1d1c36d..e2a1ae6 100644 --- a/src/kudu/tablet/rowset.h +++ b/src/kudu/tablet/rowset.h @@ -48,6 +48,7 @@ class RowChangeList; class RowwiseIterator; class Schema; class Slice; +struct ColumnId; namespace consensus { class OpId; @@ -182,6 +183,9 @@ class RowSet { // Does not include bloomfiles, the ad hoc index, or UNDO deltas. virtual uint64_t OnDiskBaseDataSizeWithRedos() const = 0; + // Return the size of this rowset's column in base data on disk, in bytes. + virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const = 0; + // Return the lock used for including this DiskRowSet in a compaction. // This prevents multiple compactions and flushes from trying to include // the same rowset. @@ -405,6 +409,9 @@ class DuplicatingRowSet : public RowSet { // Return the total size on-disk of this rowset's data (i.e. excludes metadata), in bytes. uint64_t OnDiskBaseDataSize() const OVERRIDE; + // Return the total size on-disk of this rowset's column data, in bytes. + uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const OVERRIDE; + // Return the size, in bytes, of this rowset's data, not including UNDOs. uint64_t OnDiskBaseDataSizeWithRedos() const OVERRIDE; http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/rowset_info.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/rowset_info.cc b/src/kudu/tablet/rowset_info.cc index e8395d4..3227817 100644 --- a/src/kudu/tablet/rowset_info.cc +++ b/src/kudu/tablet/rowset_info.cc @@ -36,6 +36,7 @@ #include "kudu/util/logging.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" +#include "kudu/common/key_range.h" using std::shared_ptr; using std::string; @@ -147,6 +148,21 @@ double WidthByDataSize(const Slice& prev, const Slice& next, return weight; } +// Computes the "width" of an interval as above, for the provided columns in the rowsets. +double WidthByDataSize(const Slice& prev, const Slice& next, + const unordered_map<RowSet*, RowSetInfo*>& active, + const vector<ColumnId>& col_ids) { + double weight = 0; + + for (const auto& rs_rsi : active) { + double fraction = StringFractionInRange(rs_rsi.second, prev, next); + for (const auto col_id : col_ids) { + weight += rs_rsi.second->size_bytes(col_id) * fraction; + } + } + + return weight; +} void CheckCollectOrderedCorrectness(const vector<RowSetInfo>& min_key, const vector<RowSetInfo>& max_key, @@ -255,6 +271,91 @@ void RowSetInfo::CollectOrdered(const RowSetTree& tree, FinalizeCDFVector(max_key, total_width); } +void RowSetInfo::SplitKeyRange(const RowSetTree& tree, + Slice start_key, + Slice stop_key, + const std::vector<ColumnId>& col_ids, + uint64_t target_chunk_size, + vector<KeyRange>* ranges) { + // check start_key greater than stop_key + CHECK(stop_key.empty() || start_key.compare(stop_key) <= 0); + + // The split process works as follows: + // For each sorted endpoint, first we identify whether it is a + // start or stop endpoint. + // + // At a start point, the associated rowset is added to the + // "active" rowset mapping. + // + // At a stop point, the rowset is removed from the "active" map. + // Note that the "active" map allows access to the incomplete + // RowSetInfo that the RowSet maps to. + // + // The algorithm keeps track of its state - a "sliding window" + // across the keyspace - by maintaining the previous key and current + // value of the total data size traversed over the intervals. + vector<RowSetInfo> active_rsi; + active_rsi.reserve(tree.all_rowsets().size()); + unordered_map<RowSet*, RowSetInfo*> active; + uint64_t chunk_size = 0; + Slice last_bound = start_key; + Slice prev = start_key; + Slice next; + + for (const auto& rse : tree.key_endpoints()) { + RowSet* rs = rse.rowset_; + next = rse.slice_; + + if (prev.compare(next) < 0) { + // reset next when next greater than stop_key + if (!stop_key.empty() && next.compare(stop_key) > 0) { + next = stop_key; + } + + uint64_t interval_size = 0; + if (col_ids.empty()) { + interval_size = WidthByDataSize(prev, next, active); + } else { + interval_size = WidthByDataSize(prev, next, active, col_ids); + } + + if (chunk_size != 0 && chunk_size + interval_size / 2 >= target_chunk_size) { + // Select the interval closest to the target chunk size + ranges->push_back(KeyRange( + last_bound.ToString(), prev.ToString(), chunk_size)); + last_bound = prev; + chunk_size = 0; + } + chunk_size += interval_size; + prev = next; + } + + if (!stop_key.empty() && prev.compare(stop_key) >= 0) { + break; + } + + // Add/remove current RowSetInfo + if (rse.endpoint_ == RowSetTree::START) { + // Store reference from vector. This is safe b/c of reserve() above. + active_rsi.push_back(RowSetInfo(rs, 0)); + active.insert(std::make_pair(rs, &active_rsi.back())); + } else if (rse.endpoint_ == RowSetTree::STOP) { + CHECK_EQ(active.erase(rs), 1); + } else { + LOG(FATAL) << "Undefined RowSet endpoint type.\n" + << "\tExpected either RowSetTree::START=" << RowSetTree::START + << " or RowSetTree::STOP=" << RowSetTree::STOP << ".\n" + << "\tRecieved:\n" + << "\t\tRowSet=" << rs->ToString() << "\n" + << "\t\tKey=" << KUDU_REDACT(next.ToDebugString()) << "\n" + << "\t\tEndpointType=" << rse.endpoint_; + } + } + if (last_bound.compare(stop_key) < 0 || stop_key.empty()) { + ranges->emplace_back(last_bound.ToString(), stop_key.ToString(), chunk_size); + } +} + RowSetInfo::RowSetInfo(RowSet* rs, double init_cdf) : cdf_min_key_(init_cdf), cdf_max_key_(init_cdf), @@ -265,6 +366,10 @@ RowSetInfo::RowSetInfo(RowSet* rs, double init_cdf) size_mb_ = std::max(implicit_cast<int>(extra_->size_bytes / 1024 / 1024), kMinSizeMb); } +uint64_t RowSetInfo::size_bytes(const ColumnId& col_id) const { + return extra_->rowset->OnDiskBaseDataColumnSize(col_id); +} + void RowSetInfo::FinalizeCDFVector(vector<RowSetInfo>* vec, double quot) { if (quot == 0) return; http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/rowset_info.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/rowset_info.h b/src/kudu/tablet/rowset_info.h index 03fb9b7..0186837 100644 --- a/src/kudu/tablet/rowset_info.h +++ b/src/kudu/tablet/rowset_info.h @@ -21,9 +21,15 @@ #include <string> #include <vector> +#include "kudu/gutil/integral_types.h" #include "kudu/gutil/ref_counted.h" +#include "kudu/util/slice.h" namespace kudu { + +class KeyRange; +struct ColumnId; + namespace tablet { class RowSet; @@ -44,6 +50,19 @@ class RowSetInfo { std::vector<RowSetInfo>* min_key, std::vector<RowSetInfo>* max_key); + // Split [start_key, stop_key) into primary key ranges by chunk size. + // + // If col_ids specified, then the size estimate used for 'target_chunk_size' + // should only include these columns. This can be used if a query will + // only scan a certain subset of the columns. + static void SplitKeyRange(const RowSetTree& tree, + Slice start_key, + Slice stop_key, + const std::vector<ColumnId>& col_ids, + uint64 target_chunk_size, + std::vector<KeyRange>* ranges); + + uint64_t size_bytes(const ColumnId& col_id) const; uint64_t size_bytes() const { return extra_->size_bytes; } int size_mb() const { return size_mb_; } http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/tablet-harness.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet-harness.h b/src/kudu/tablet/tablet-harness.h index f344006..c5610ee 100644 --- a/src/kudu/tablet/tablet-harness.h +++ b/src/kudu/tablet/tablet-harness.h @@ -133,6 +133,10 @@ class TabletHarness { return tablet_; } + Tablet* mutable_tablet() { + return tablet_.get(); + } + FsManager* fs_manager() { return fs_manager_.get(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/tablet-test-util.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h index fa8d4f8..06b964e 100644 --- a/src/kudu/tablet/tablet-test-util.h +++ b/src/kudu/tablet/tablet-test-util.h @@ -104,6 +104,10 @@ class KuduTabletTest : public KuduTest { return harness_->tablet(); } + Tablet* mutable_tablet() { + return harness_->mutable_tablet(); + } + TabletHarness* harness() { return harness_.get(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/tablet-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc index 2af4877..e3fac2d 100644 --- a/src/kudu/tablet/tablet-test.cc +++ b/src/kudu/tablet/tablet-test.cc @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include "kudu/tablet/tablet.h" + #include <algorithm> #include <cstdint> #include <ctime> @@ -31,7 +33,10 @@ #include "kudu/cfile/cfile_util.h" #include "kudu/common/common.pb.h" +#include "kudu/common/encoded_key.h" #include "kudu/common/iterator.h" +#include "kudu/common/key_range.h" +#include "kudu/common/partial_row.h" #include "kudu/common/rowblock.h" #include "kudu/common/schema.h" #include "kudu/common/timestamp.h" @@ -39,22 +44,26 @@ #include "kudu/fs/block_manager.h" #include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/port.h" +#include "kudu/gutil/ref_counted.h" #include "kudu/gutil/stl_util.h" #include "kudu/gutil/strings/join.h" #include "kudu/tablet/delta_key.h" #include "kudu/tablet/delta_stats.h" #include "kudu/tablet/deltafile.h" #include "kudu/tablet/local_tablet_writer.h" +#include "kudu/tablet/mock-rowsets.h" #include "kudu/tablet/mvcc.h" +#include "kudu/tablet/rowset.h" #include "kudu/tablet/rowset_metadata.h" +#include "kudu/tablet/rowset_tree.h" #include "kudu/tablet/tablet-test-base.h" #include "kudu/tablet/tablet-test-util.h" -#include "kudu/tablet/tablet.h" #include "kudu/tablet/tablet.pb.h" #include "kudu/tablet/tablet_metadata.h" #include "kudu/tablet/tablet_metrics.h" // IWYU pragma: keep #include "kudu/util/faststring.h" #include "kudu/util/jsonwriter.h" +#include "kudu/util/memory/arena.h" #include "kudu/util/metrics.h" #include "kudu/util/status.h" #include "kudu/util/stopwatch.h" @@ -1136,5 +1145,260 @@ TEST(TestTablet, TestGetReplaySizeForIndex) { EXPECT_EQ(Tablet::GetReplaySizeForIndex(min_log_index, replay_size_map), 0); } +class TestTabletStringKey : public TestTablet<StringKeyTestSetup> { +public: + void AssertChunks(vector<KeyRange> expected, vector<KeyRange> actual) { + ASSERT_EQ(actual.size(), expected.size()); + for (size_t idx = 0; idx < actual.size(); ++idx) { + ASSERT_STREQ(actual[idx].start_primary_key().c_str(), + expected[idx].start_primary_key().c_str()); + ASSERT_STREQ(actual[idx].stop_primary_key().c_str(), + expected[idx].stop_primary_key().c_str()); + ASSERT_EQ(actual[idx].size_bytes(), expected[idx].size_bytes()); + } + } +}; + +// Test for split key range +TEST_F(TestTabletStringKey, TestSplitKeyRange) { + Tablet* tablet = this->mutable_tablet(); + + scoped_refptr<TabletComponents> comps; + tablet->GetComponents(&comps); + + RowSetVector old_rowset = comps->rowsets->all_rowsets(); + RowSetVector new_rowset = { + std::make_shared<MockDiskRowSet>("0", "9", 9000, 90), + std::make_shared<MockDiskRowSet>("2", "5", 3000, 30), + std::make_shared<MockDiskRowSet>("5", "6", 1000, 10) + }; + tablet->AtomicSwapRowSets(old_rowset, new_rowset); + { + std::vector<KeyRange> result = { + KeyRange("", "2", 2000), + KeyRange("2", "5", 6000), + KeyRange("5", "6", 2000), + KeyRange("6", "", 3000) + }; + std::vector<ColumnId> col_ids; + std::vector<KeyRange> range; + tablet->SplitKeyRange(nullptr, nullptr, col_ids, 3000, &range); + AssertChunks(result, range); + } + // target chunk size less than the min interval size + { + std::vector<KeyRange> result = { + KeyRange("", "2", 2000), + KeyRange("2", "5", 6000), + KeyRange("5", "6", 2000), + KeyRange("6", "", 3000) + }; + std::vector<ColumnId> col_ids; + std::vector<KeyRange> range; + tablet->SplitKeyRange(nullptr, nullptr, col_ids, 900, &range); + AssertChunks(result, range); + } + // target chunk size greater than the max interval size + { + std::vector<KeyRange> result = { + KeyRange("", "", 13000) + }; + std::vector<ColumnId> col_ids; + std::vector<KeyRange> range; + tablet->SplitKeyRange(nullptr, nullptr, col_ids, 20000, &range); + AssertChunks(result, range); + } + // test split key range with column + { + std::vector<KeyRange> result = { + KeyRange("", "2", 40), + KeyRange("2", "5", 120), + KeyRange("5", "6", 40), + KeyRange("6", "", 60) + }; + std::vector<ColumnId> col_ids; + col_ids.emplace_back(0); + col_ids.emplace_back(1); + std::vector<KeyRange> range; + tablet->SplitKeyRange(nullptr, nullptr, col_ids, 60, &range); + AssertChunks(result, range); + } + // test split key range with bound + { + gscoped_ptr<EncodedKey> l_enc_key; + gscoped_ptr<EncodedKey> u_enc_key; + Arena arena(256); + KuduPartialRow lower_bound(&this->schema_); + CHECK_OK(lower_bound.SetString("key", "1")); + CHECK_OK(lower_bound.SetInt32("key_idx", 0)); + CHECK_OK(lower_bound.SetInt32("val", 0)); + string l_encoded; + ASSERT_OK(lower_bound.EncodeRowKey(&l_encoded)); + ASSERT_OK(EncodedKey::DecodeEncodedString(this->schema_, &arena, l_encoded, &l_enc_key)); + + KuduPartialRow upper_bound(&this->schema_); + CHECK_OK(upper_bound.SetString("key", "4")); + CHECK_OK(upper_bound.SetInt32("key_idx", 0)); + CHECK_OK(upper_bound.SetInt32("val", 0)); + string u_encoded; + ASSERT_OK(upper_bound.EncodeRowKey(&u_encoded)); + ASSERT_OK(EncodedKey::DecodeEncodedString(this->schema_, &arena, u_encoded, &u_enc_key)); + // split key range in [1, 4) + { + std::vector<KeyRange> result = { + KeyRange("1", "2", 1000), + KeyRange("2", "4", 4000) + }; + std::vector<ColumnId> col_ids; + std::vector<KeyRange> range; + tablet->SplitKeyRange(l_enc_key.get(), u_enc_key.get(), col_ids, 2000, &range); + AssertChunks(result, range); + } + // split key range in [min, 4) + { + std::vector<KeyRange> result = { + KeyRange("", "2", 2000), + KeyRange("2", "4", 4000) + }; + std::vector<ColumnId> col_ids; + std::vector<KeyRange> range; + tablet->SplitKeyRange(nullptr, u_enc_key.get(), col_ids, 2000, &range); + AssertChunks(result, range); + } + // split key range in [4, max) + { + std::vector<KeyRange> result = { + KeyRange("4", "5", 2000), + KeyRange("5", "6", 2000), + KeyRange("6", "", 3000) + }; + std::vector<ColumnId> col_ids; + std::vector<KeyRange> range; + tablet->SplitKeyRange(u_enc_key.get(), nullptr, col_ids, 2000, &range); + AssertChunks(result, range); + } + } +} + +// Test for split key range, tablet with 0 rowsets +TEST_F(TestTabletStringKey, TestSplitKeyRangeWithZeroRowSets) { + Tablet* tablet = this->mutable_tablet(); + + scoped_refptr<TabletComponents> comps; + tablet->GetComponents(&comps); + + RowSetVector old_rowset = comps->rowsets->all_rowsets(); + RowSetVector new_rowset = {}; + tablet->AtomicSwapRowSets(old_rowset, new_rowset); + { + std::vector<KeyRange> result = { + KeyRange("", "", 0) + }; + std::vector<ColumnId> col_ids; + std::vector<KeyRange> range; + tablet->SplitKeyRange(nullptr, nullptr, col_ids, 3000, &range); + AssertChunks(result, range); + } +} + +// Test for split key range, tablet with 1 rowset +TEST_F(TestTabletStringKey, TestSplitKeyRangeWithOneRowSet) { + Tablet* tablet = this->mutable_tablet(); + + scoped_refptr<TabletComponents> comps; + tablet->GetComponents(&comps); + + RowSetVector old_rowset = comps->rowsets->all_rowsets(); + RowSetVector new_rowset = { + std::make_shared<MockDiskRowSet>("0", "9", 9000, 90) + }; + tablet->AtomicSwapRowSets(old_rowset, new_rowset); + { + std::vector<KeyRange> result = { + KeyRange("", "", 9000) + }; + std::vector<ColumnId> col_ids; + std::vector<KeyRange> range; + tablet->SplitKeyRange(nullptr, nullptr, col_ids, 3000, &range); + AssertChunks(result, range); + } +} + +// Test for split key range, tablet with non-overlapping rowsets +TEST_F(TestTabletStringKey, TestSplitKeyRangeWithNonOverlappingRowSets) { + Tablet *tablet = this->mutable_tablet(); + + // Rowsets without gaps + scoped_refptr<TabletComponents> comps; + tablet->GetComponents(&comps); + RowSetVector old_rowset = comps->rowsets->all_rowsets(); + RowSetVector without_gaps_rowset = { + std::make_shared<MockDiskRowSet>("0", "2", 2000, 20), + std::make_shared<MockDiskRowSet>("2", "5", 3000, 30), + std::make_shared<MockDiskRowSet>("5", "6", 1000, 10), + std::make_shared<MockDiskRowSet>("6", "9", 3000, 30) + }; + tablet->AtomicSwapRowSets(old_rowset, without_gaps_rowset); + { + std::vector<KeyRange> result = { + KeyRange("", "2", 2000), + KeyRange("2", "5", 3000), + KeyRange("5", "", 4000) + }; + std::vector<ColumnId> col_ids; + std::vector<KeyRange> range; + tablet->SplitKeyRange(nullptr, nullptr, col_ids, 3000, &range); + AssertChunks(result, range); + } + + // Rowsets with gaps + tablet->GetComponents(&comps); + old_rowset = comps->rowsets->all_rowsets(); + RowSetVector with_gaps_rowset = { + std::make_shared<MockDiskRowSet>("0", "2", 2000, 20), + std::make_shared<MockDiskRowSet>("5", "6", 1000, 10), + std::make_shared<MockDiskRowSet>("6", "9", 3000, 30) + }; + tablet->AtomicSwapRowSets(old_rowset, with_gaps_rowset); + { + std::vector<KeyRange> result = { + KeyRange("", "6", 3000), + KeyRange("6", "", 3000) + }; + std::vector<ColumnId> col_ids; + std::vector<KeyRange> range; + tablet->SplitKeyRange(nullptr, nullptr, col_ids, 3000, &range); + AssertChunks(result, range); + } +} + +// Test for split key range, tablet with rowset whose start is the minimum value +TEST_F(TestTabletStringKey, TestSplitKeyRangeWithMinimumValueRowSet) { + Tablet *tablet = this->mutable_tablet(); + + // Rowsets without gaps + scoped_refptr<TabletComponents> comps; + tablet->GetComponents(&comps); + RowSetVector old_rowset = comps->rowsets->all_rowsets(); + RowSetVector without_gaps_rowset = { + std::make_shared<MockDiskRowSet>("", "2", 2500, 20), + std::make_shared<MockDiskRowSet>("2", "5", 3000, 30), + std::make_shared<MockDiskRowSet>("5", "6", 1000, 10), + std::make_shared<MockDiskRowSet>("6", "9", 3000, 30) + }; + tablet->AtomicSwapRowSets(old_rowset, without_gaps_rowset); + { + std::vector<KeyRange> result = { + KeyRange("", "2", 2500), + KeyRange("2", "5", 3000), + KeyRange("5", "", 4000) + }; + std::vector<ColumnId> col_ids; + std::vector<KeyRange> range; + tablet->SplitKeyRange(nullptr, nullptr, col_ids, 3000, &range); + AssertChunks(result, range); + } +} + } // namespace tablet } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/tablet.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc index b81f351..4fa272f 100644 --- a/src/kudu/tablet/tablet.cc +++ b/src/kudu/tablet/tablet.cc @@ -374,6 +374,28 @@ BloomFilterSizing Tablet::DefaultBloomSizing() { FLAGS_tablet_bloom_target_fp_rate); } +void Tablet::SplitKeyRange(const EncodedKey* start_key, + const EncodedKey* stop_key, + const std::vector<ColumnId>& column_ids, + uint64 target_chunk_size, + std::vector<KeyRange>* key_range_info) { + shared_ptr<RowSetTree> rowsets_copy; + { + shared_lock<rw_spinlock> l(component_lock_); + rowsets_copy = components_->rowsets; + } + + Slice start, stop; + if (start_key != nullptr) { + start = start_key->encoded_key(); + } + if (stop_key != nullptr) { + stop = stop_key->encoded_key(); + } + RowSetInfo::SplitKeyRange(*rowsets_copy, start, stop, + column_ids, target_chunk_size, key_range_info); +} + Status Tablet::NewRowIterator(const Schema &projection, gscoped_ptr<RowwiseIterator> *iter) const { // Yield current rows. http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tablet/tablet.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h index af6797c..b2262c5 100644 --- a/src/kudu/tablet/tablet.h +++ b/src/kudu/tablet/tablet.h @@ -36,6 +36,7 @@ #include "kudu/common/schema.h" #include "kudu/fs/io_context.h" #include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/integral_types.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/port.h" #include "kudu/gutil/ref_counted.h" @@ -55,6 +56,8 @@ namespace kudu { class ConstContiguousRow; +class EncodedKey; +class KeyRange; class MaintenanceManager; class MaintenanceOp; class MaintenanceOpStats; @@ -433,10 +436,26 @@ class Tablet { // Return the default bloom filter sizing parameters, configured by server flags. static BloomFilterSizing DefaultBloomSizing(); + // Split [start_key, stop_key) into primary key ranges by chunk size. + // + // If column_ids specified, then the size estimate used for 'target_chunk_size' + // should only include these columns. This can be used if a query will + // only scan a certain subset of the columns. + void SplitKeyRange(const EncodedKey* start_key, + const EncodedKey* stop_key, + const std::vector<ColumnId>& column_ids, + uint64 target_chunk_size, + std::vector<KeyRange>* ranges); + private: friend class Iterator; friend class TabletReplicaTest; FRIEND_TEST(TestTablet, TestGetReplaySizeForIndex); + FRIEND_TEST(TestTabletStringKey, TestSplitKeyRange); + FRIEND_TEST(TestTabletStringKey, TestSplitKeyRangeWithZeroRowSets); + FRIEND_TEST(TestTabletStringKey, TestSplitKeyRangeWithOneRowSet); + FRIEND_TEST(TestTabletStringKey, TestSplitKeyRangeWithNonOverlappingRowSets); + FRIEND_TEST(TestTabletStringKey, TestSplitKeyRangeWithMinimumValueRowSet); // Lifecycle states that a Tablet can be in. Legal state transitions for a // Tablet object: http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tserver/tablet_service.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc index 9be9b85..4b6f3de 100644 --- a/src/kudu/tserver/tablet_service.cc +++ b/src/kudu/tserver/tablet_service.cc @@ -41,6 +41,7 @@ #include "kudu/common/encoded_key.h" #include "kudu/common/iterator.h" #include "kudu/common/iterator_stats.h" +#include "kudu/common/key_range.h" #include "kudu/common/partition.h" #include "kudu/common/rowblock.h" #include "kudu/common/scan_spec.h" @@ -1387,6 +1388,113 @@ void TabletServiceImpl::ListTablets(const ListTabletsRequestPB* req, context->RespondSuccess(); } +void TabletServiceImpl::SplitKeyRange(const SplitKeyRangeRequestPB* req, + SplitKeyRangeResponsePB* resp, + rpc::RpcContext* context) { + TRACE_EVENT1("tserver", "TabletServiceImpl::SplitKeyRange", + "tablet_id", req->tablet_id()); + DVLOG(3) << "Received SplitKeyRange RPC: " << SecureDebugString(*req); + + scoped_refptr<TabletReplica> replica; + if (!LookupRunningTabletReplicaOrRespond(server_->tablet_manager(), req->tablet_id(), resp, + context, &replica)) { + return; + } + + shared_ptr<Tablet> tablet; + TabletServerErrorPB::Code error_code; + Status s = GetTabletRef(replica, &tablet, &error_code); + if (PREDICT_FALSE(!s.ok())) { + SetupErrorAndRespond(resp->mutable_error(), s, error_code, context); + return; + } + + // Decode encoded key + Arena arena(256); + Schema tablet_schema = replica->tablet_metadata()->schema(); + gscoped_ptr<EncodedKey> start, stop; + if (req->has_start_primary_key()) { + s = EncodedKey::DecodeEncodedString(tablet_schema, &arena, req->start_primary_key(), &start); + if (PREDICT_FALSE(!s.ok())) { + SetupErrorAndRespond(resp->mutable_error(), + Status::InvalidArgument("Invalid SplitKeyRange start primary key"), + TabletServerErrorPB::UNKNOWN_ERROR, + context); + return; + } + } + if (req->has_stop_primary_key()) { + s = EncodedKey::DecodeEncodedString(tablet_schema, &arena, req->stop_primary_key(), &stop); + if (PREDICT_FALSE(!s.ok())) { + SetupErrorAndRespond(resp->mutable_error(), + Status::InvalidArgument("Invalid SplitKeyRange stop primary key"), + TabletServerErrorPB::UNKNOWN_ERROR, + context); + return; + } + } + if (req->has_start_primary_key() && req->has_stop_primary_key()) { + // Validate the start key is less than the stop key, if they are both set + if (start->encoded_key().compare(stop->encoded_key()) > 0) { + SetupErrorAndRespond(resp->mutable_error(), + Status::InvalidArgument("Invalid primary key range"), + TabletServerErrorPB::UNKNOWN_ERROR, + context); + return; + } + } + + // Validate the column are valid + Schema schema; + s = ColumnPBsToSchema(req->columns(), &schema); + if (PREDICT_FALSE(!s.ok())) { + SetupErrorAndRespond(resp->mutable_error(), + s, + TabletServerErrorPB::INVALID_SCHEMA, + context); + return; + } + if (schema.has_column_ids()) { + SetupErrorAndRespond(resp->mutable_error(), + Status::InvalidArgument("User requests should not have Column IDs"), + TabletServerErrorPB::INVALID_SCHEMA, + context); + return; + } + + vector<ColumnId> column_ids; + for (const ColumnSchema& column : schema.columns()) { + int column_id = tablet_schema.find_column(column.name()); + if (PREDICT_FALSE(column_id == Schema::kColumnNotFound)) { + SetupErrorAndRespond(resp->mutable_error(), + Status::InvalidArgument( + "Invalid SplitKeyRange column name", column.name()), + TabletServerErrorPB::INVALID_SCHEMA, + context); + return; + } + column_ids.emplace_back(column_id); + } + + // Validate the target chunk size are valid + if (req->target_chunk_size_bytes() == 0) { + SetupErrorAndRespond(resp->mutable_error(), + Status::InvalidArgument("Invalid SplitKeyRange target chunk size"), + TabletServerErrorPB::UNKNOWN_ERROR, + context); + return; + } + + vector<KeyRange> ranges; + tablet->SplitKeyRange(start.get(), stop.get(), column_ids, + req->target_chunk_size_bytes(), &ranges); + for (auto range : ranges) { + range.ToPB(resp->add_ranges()); + } + + context->RespondSuccess(); +} + void TabletServiceImpl::Checksum(const ChecksumRequestPB* req, ChecksumResponsePB* resp, rpc::RpcContext* context) { http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tserver/tablet_service.h ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tablet_service.h b/src/kudu/tserver/tablet_service.h index 8b3261e..1c67a94 100644 --- a/src/kudu/tserver/tablet_service.h +++ b/src/kudu/tserver/tablet_service.h @@ -123,6 +123,10 @@ class TabletServiceImpl : public TabletServerServiceIf { ListTabletsResponsePB* resp, rpc::RpcContext* context) OVERRIDE; + virtual void SplitKeyRange(const SplitKeyRangeRequestPB* req, + SplitKeyRangeResponsePB* resp, + rpc::RpcContext* context) OVERRIDE; + virtual void Checksum(const ChecksumRequestPB* req, ChecksumResponsePB* resp, rpc::RpcContext* context) OVERRIDE; http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tserver/tserver.proto ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto index 77d4757..f9f8cef 100644 --- a/src/kudu/tserver/tserver.proto +++ b/src/kudu/tserver/tserver.proto @@ -385,6 +385,34 @@ message ScannerKeepAliveResponsePB { optional TabletServerErrorPB error = 1; } +// A split key range request. Split tablet to key ranges, the request +// doesn't change layout of tablet. +message SplitKeyRangeRequestPB { + required bytes tablet_id = 1; + + // Encoded primary key to begin scanning at (inclusive). + optional bytes start_primary_key = 2 [(kudu.REDACT) = true]; + // Encoded primary key to stop scanning at (exclusive). + optional bytes stop_primary_key = 3 [(kudu.REDACT) = true]; + + // Number of bytes to try to return in each chunk. This is a hint. + // The tablet server may return chunks larger or smaller than this value. + optional uint64 target_chunk_size_bytes = 4; + + // The columns to consider when chunking. + // If specified, then the size estimate used for 'target_chunk_size_bytes' + // should only include these columns. This can be used if a query will + // only scan a certain subset of the columns. + repeated ColumnSchemaPB columns = 5; +} + +message SplitKeyRangeResponsePB { + // The error, if an error occurred with this request. + optional TabletServerErrorPB error = 1; + + repeated KeyRangePB ranges = 2; +} + enum TabletServerFeatures { UNKNOWN_FEATURE = 0; COLUMN_PREDICATES = 1; http://git-wip-us.apache.org/repos/asf/kudu/blob/d4ded71b/src/kudu/tserver/tserver_service.proto ---------------------------------------------------------------------- diff --git a/src/kudu/tserver/tserver_service.proto b/src/kudu/tserver/tserver_service.proto index daf650b..78b99a3 100644 --- a/src/kudu/tserver/tserver_service.proto +++ b/src/kudu/tserver/tserver_service.proto @@ -44,6 +44,9 @@ service TabletServerService { rpc ListTablets(ListTabletsRequestPB) returns (ListTabletsResponsePB) { option (kudu.rpc.authz_method) = "AuthorizeClient"; } + rpc SplitKeyRange(SplitKeyRangeRequestPB) returns (SplitKeyRangeResponsePB) { + option (kudu.rpc.authz_method) = "AuthorizeClient"; + } // Run full-scan data checksum on a tablet to verify data integrity. //
