This is an automated email from the ASF dual-hosted git repository. laiyingchun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 936d7edc4e4b69d2e1f1dffc96760cb3fd57a934 Author: zhangyifan27 <[email protected]> AuthorDate: Mon Apr 18 21:17:00 2022 +0800 KUDU-1644: Simplify InList predicate values based on rowset PK bounds Previous we only optimize InList predicates based on tablet PK bounds, we can also optimize it at the DRS level. By adding the implicit PK bounds, InList predicate can be simplified. Also, the DRS bounds info can be used to skip rows effectively when we have a predicate on a non-prefix of the primary key and the leading column(s) have cardinality=1 (as described in KUDU-1291). Benchmark tests result(in slow mode): before Selected 10000 rows cost 2.519996 seconds. # PredicateOnFirstColumn Selected 100 rows cost 2.040003 seconds. # PredicateOnSecondColumn after Selected 10000 rows cost 1.771755 seconds. # PredicateOnFirstColumn Selected 100 rows cost 0.131996 seconds. # PredicateOnSecondColumn Change-Id: Ia9c2aa958f19a0b62e40a2ef5eb5365f91cbab80 Reviewed-on: http://gerrit.cloudera.org:8080/18434 Tested-by: Kudu Jenkins Reviewed-by: Yingchun Lai <[email protected]> --- src/kudu/common/generic_iterators.cc | 3 +- src/kudu/common/scan_spec.cc | 28 +++--- src/kudu/common/scan_spec.h | 4 + src/kudu/tablet/cfile_set-test.cc | 188 ++++++++++++++++++++++++++++++++--- src/kudu/tablet/cfile_set.cc | 53 ++++++++-- src/kudu/tablet/cfile_set.h | 10 +- 6 files changed, 247 insertions(+), 39 deletions(-) diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc index 46fc55055..10457e4d8 100644 --- a/src/kudu/common/generic_iterators.cc +++ b/src/kudu/common/generic_iterators.cc @@ -1175,7 +1175,8 @@ Status MaterializingIterator::Init(ScanSpec *spec) { const int32_t num_columns = schema().num_columns(); col_idx_predicates_.clear(); non_predicate_column_indexes_.clear(); - if (PREDICT_FALSE(!disallow_pushdown_for_tests_) && spec != nullptr) { + if (PREDICT_TRUE(!disallow_pushdown_for_tests_) && spec != nullptr && + !spec->predicates().empty()) { col_idx_predicates_.reserve(spec->predicates().size()); DCHECK_GE(num_columns, spec->predicates().size()); non_predicate_column_indexes_.reserve(num_columns - spec->predicates().size()); diff --git a/src/kudu/common/scan_spec.cc b/src/kudu/common/scan_spec.cc index 688a5e2e3..3fdfaa837 100644 --- a/src/kudu/common/scan_spec.cc +++ b/src/kudu/common/scan_spec.cc @@ -147,19 +147,15 @@ string ScanSpec::ToString(const Schema& schema) const { return JoinStrings(preds, " AND ") + limit; } -void ScanSpec::OptimizeScan(const Schema& schema, - Arena* arena, - bool remove_pushed_predicates) { - // Don't bother if we can already short circuit the scan. This also lets us - // rely on lower_bound_key_ < exclusive_upper_bound_key_ and no None - // predicates in the optimization step. - if (!CanShortCircuit()) { - LiftPrimaryKeyBounds(schema, arena); - // Predicates may be has None, after merge PrimaryKeyBounds and Predicates - if (!CanShortCircuit()) { - PushPredicatesIntoPrimaryKeyBounds(schema, arena, remove_pushed_predicates); - } - } +void ScanSpec::UnifyPrimaryKeyBoundsAndColumnPredicates(const Schema& schema, + Arena* arena, + bool remove_pushed_predicates) { + LiftPrimaryKeyBounds(schema, arena); + PushPredicatesIntoPrimaryKeyBounds(schema, arena, remove_pushed_predicates); +} + +void ScanSpec::OptimizeScan(const Schema& schema, Arena* arena, bool remove_pushed_predicates) { + UnifyPrimaryKeyBoundsAndColumnPredicates(schema, arena, remove_pushed_predicates); // KUDU-1652: Filter IS NOT NULL predicates for non-nullable columns. for (auto itr = predicates_.begin(); itr != predicates_.end(); ) { @@ -227,6 +223,9 @@ vector<ColumnSchema> ScanSpec::GetMissingColumns(const Schema& projection) { void ScanSpec::PushPredicatesIntoPrimaryKeyBounds(const Schema& schema, Arena* arena, bool remove_pushed_predicates) { + if (CanShortCircuit()) { + return; + } // Step 1: load key column predicate values into a pair of rows. uint8_t* lower_buf = static_cast<uint8_t*>( CHECK_NOTNULL(arena->AllocateBytes(schema.key_byte_size()))); @@ -279,6 +278,9 @@ void ScanSpec::PushPredicatesIntoPrimaryKeyBounds(const Schema& schema, } void ScanSpec::LiftPrimaryKeyBounds(const Schema& schema, Arena* arena) { + if (CanShortCircuit()) { + return; + } if (lower_bound_key_ == nullptr && exclusive_upper_bound_key_ == nullptr) { return; } int32_t num_key_columns = schema.num_key_columns(); for (int32_t col_idx = 0; col_idx < num_key_columns; col_idx++) { diff --git a/src/kudu/common/scan_spec.h b/src/kudu/common/scan_spec.h index 506612d5b..e53474e9e 100644 --- a/src/kudu/common/scan_spec.h +++ b/src/kudu/common/scan_spec.h @@ -67,6 +67,10 @@ class ScanSpec { // into the upper or lower primary key bounds are removed. // // Idempotent. + void UnifyPrimaryKeyBoundsAndColumnPredicates(const Schema& schema, + Arena* arena, + bool remove_pushed_predicates); + void OptimizeScan(const Schema& schema, Arena* arena, bool remove_pushed_predicates); diff --git a/src/kudu/tablet/cfile_set-test.cc b/src/kudu/tablet/cfile_set-test.cc index d3f66c376..6b115cbe9 100644 --- a/src/kudu/tablet/cfile_set-test.cc +++ b/src/kudu/tablet/cfile_set-test.cc @@ -47,6 +47,7 @@ #include "kudu/gutil/port.h" #include "kudu/gutil/stringprintf.h" #include "kudu/gutil/strings/stringpiece.h" +#include "kudu/gutil/strings/substitute.h" #include "kudu/tablet/diskrowset.h" #include "kudu/tablet/tablet-test-util.h" #include "kudu/util/block_bloom_filter.h" @@ -57,7 +58,9 @@ #include "kudu/util/memory/arena.h" #include "kudu/util/slice.h" #include "kudu/util/status.h" +#include "kudu/util/stopwatch.h" #include "kudu/util/test_macros.h" +#include "kudu/util/test_util.h" DECLARE_int32(cfile_default_block_size); @@ -69,6 +72,18 @@ using std::vector; namespace kudu { namespace tablet { +ScanSpec GetInListScanSpec(const ColumnSchema& col_schema, const vector<int>& value_list) { + vector<const void*> pred_list; + pred_list.reserve(value_list.size()); + for (int i = 0; i < value_list.size(); i++) { + pred_list.emplace_back(&value_list[i]); + } + ColumnPredicate pred = ColumnPredicate::InList(col_schema, &pred_list); + ScanSpec spec; + spec.AddPredicate(pred); + return spec; +} + class TestCFileSet : public KuduRowSetTest { public: TestCFileSet() : @@ -161,16 +176,16 @@ class TestCFileSet : public KuduRowSetTest { uint32_t hash2 = HashUtil::ComputeHash32(second, FAST_HASH, 0); if (bf1_contain->Find(hash1)) { - ret1_contain->push_back(i); + ret1_contain->push_back(curr1); } if (bf1_exclude->Find(hash1)) { - ret1_exclude->push_back(i); + ret1_exclude->push_back(curr1); } if (bf2_contain->Find(hash2)) { - ret2_contain->push_back(i); + ret2_contain->push_back(curr1); } if (bf2_exclude->Find(hash2)) { - ret2_exclude->push_back(i); + ret2_exclude->push_back(curr1); } } } @@ -227,7 +242,7 @@ class TestCFileSet : public KuduRowSetTest { spec.AddPredicate(pred); } ASSERT_OK(iter->Init(&spec)); - // Check that the range was respected on all the results. + // Check that InBloomFilter predicates were respected on all the results. RowBlockMemory mem(1024); RowBlock block(&schema_, 100, &mem); while (iter->HasNext()) { @@ -235,8 +250,8 @@ class TestCFileSet : public KuduRowSetTest { for (size_t i = 0; i < block.nrows(); i++) { if (block.selection_vector()->IsRowSelected(i)) { RowBlockRow row = block.row(i); - size_t index = row.row_index(); - auto target_iter = std::find(target.begin(), target.end(), index); + int32_t row_key = *schema_.ExtractColumnFromRow<INT32>(row, 0); + auto target_iter = std::find(target.begin(), target.end(), row_key); if (target_iter == target.end()) { FAIL() << "Row " << schema_.DebugRow(row) << " should not have " << "passed predicate "; @@ -252,9 +267,49 @@ class TestCFileSet : public KuduRowSetTest { } } - Status MaterializeColumn(CFileSet::Iterator *iter, - size_t col_idx, - ColumnBlock *cb) { + // Issue a InList scan and verify that all result rows indeed fall inside that predicate. + void DoTestInListScan(const shared_ptr<CFileSet>& fileset, int upper_bound, int interval) { + // Create iterator. + unique_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_, nullptr)); + unique_ptr<RowwiseIterator> iter(NewMaterializingIterator(std::move(cfile_iter))); + + // Create a scan with a InList predicate on the key column. + vector<int> value_list; + vector<int> result_list; + for (int i = 0; i < upper_bound; i += interval) { + value_list.emplace_back(i * kRatio[0]); + result_list.emplace_back(i * kRatio[0]); + } + auto spec = GetInListScanSpec(schema_.column(0), value_list); + ASSERT_OK(iter->Init(&spec)); + + // Check that the InList predicate was respected on all the results. + RowBlockMemory mem(1024); + RowBlock block(&schema_, 100, &mem); + int selected_size = 0; + while (iter->HasNext()) { + mem.Reset(); + ASSERT_OK_FAST(iter->NextBlock(&block)); + for (size_t i = 0; i < block.nrows(); i++) { + if (block.selection_vector()->IsRowSelected(i)) { + RowBlockRow row = block.row(i); + int32_t row_key = *schema_.ExtractColumnFromRow<INT32>(row, 0); + auto target_iter = std::find(result_list.begin(), result_list.end(), row_key); + if (target_iter == result_list.end()) { + FAIL() << "Row " << schema_.DebugRow(row) << " should not have passed predicate."; + } + result_list.erase(target_iter); + } + } + selected_size += block.selection_vector()->CountSelected(); + } + LOG(INFO) << "Selected size: " << selected_size; + if (!result_list.empty()) { + FAIL() << result_list.size() << " values should have passed predicate."; + } + } + + static Status MaterializeColumn(CFileSet::Iterator* iter, size_t col_idx, ColumnBlock* cb) { SelectionVector sel(cb->nrows()); ColumnMaterializationContext ctx(col_idx, nullptr, cb, &sel); return iter->MaterializeColumn(&ctx); @@ -544,14 +599,10 @@ TEST_F(TestCFileSet, TestBloomFilterPredicates) { // BloomFilter of column 0 contain with lower and upper bound. int32_t lower = 8; int32_t upper = 58; - int32_t lower_row_index = lower / 2; - int32_t upper_row_index = upper / 2; vector<size_t> ret1_contain_range = ret1_contain; - auto left = std::lower_bound(ret1_contain_range.begin(), - ret1_contain_range.end(), lower_row_index); + auto left = std::lower_bound(ret1_contain_range.begin(), ret1_contain_range.end(), lower); ret1_contain_range.erase(ret1_contain_range.begin(), left); // don't erase left - auto right = std::lower_bound(ret1_contain_range.begin(), - ret1_contain_range.end(), upper_row_index); + auto right = std::lower_bound(ret1_contain_range.begin(), ret1_contain_range.end(), upper); ret1_contain_range.erase(right, ret1_contain_range.end()); // earse right auto range = ColumnPredicate::Range(schema_.column(0), &lower, &upper); DoTestBloomFilterScan(fileset, { pred1_contain, range }, ret1_contain_range); @@ -562,5 +613,110 @@ TEST_F(TestCFileSet, TestBloomFilterPredicates) { DoTestBloomFilterScan(fileset, { bf_with_range }, ret1_contain_range); } +TEST_F(TestCFileSet, TestInListPredicates) { + const int kNumRows = 10000; + WriteTestRowSet(kNumRows); + + shared_ptr<CFileSet> fileset; + ASSERT_OK(CFileSet::Open( + rowset_meta_, MemTracker::GetRootTracker(), MemTracker::GetRootTracker(), nullptr, &fileset)); + + // Test different size and interval. + DoTestInListScan(fileset, 1, 1); + DoTestInListScan(fileset, 10, 1); + DoTestInListScan(fileset, 100, 5); + DoTestInListScan(fileset, 1000, 10); +} + +class InListPredicateBenchmark : public KuduRowSetTest { + public: + InListPredicateBenchmark() + : KuduRowSetTest(Schema({ColumnSchema("c0", INT32), ColumnSchema("c1", INT32)}, 2)) {} + + void SetUp() OVERRIDE { + KuduRowSetTest::SetUp(); + + // Use a small cfile block size, so that when we skip materializing a given + // column for 10,000 rows, it can actually skip over a number of blocks. + FLAGS_cfile_default_block_size = 512; + } + + // Write out a test rowset with two int columns. + // The two columns make up a composite primary key. + // The first column contains only one value 1. + // The second contains the row index * 10. + void WriteTestRowSet(int nrows) { + DiskRowSetWriter rsw( + rowset_meta_.get(), &schema_, BloomFilterSizing::BySizeAndFPRate(32 * 1024, 0.01F)); + + ASSERT_OK(rsw.Open()); + + RowBuilder rb(&schema_); + for (int i = 0; i < nrows; i++) { + rb.Reset(); + rb.AddInt32(1); + rb.AddInt32(i * 10); + ASSERT_OK_FAST(WriteRow(rb.data(), &rsw)); + } + ASSERT_OK(rsw.Finish()); + } + + void BenchmarkInListScan(const ColumnSchema& col_schema, const vector<int>& value_list) { + // Write some rows and open the fileset. + const int kNumRows = 10000; + const int kNumIters = AllowSlowTests() ? 10000 : 100; + WriteTestRowSet(kNumRows); + shared_ptr<CFileSet> fileset; + ASSERT_OK(CFileSet::Open(rowset_meta_, + MemTracker::GetRootTracker(), + MemTracker::GetRootTracker(), + nullptr, + &fileset)); + Stopwatch sw; + sw.start(); + int selected_size = 0; + for (int i = 0; i < kNumIters; i++) { + // LOG(INFO) << "Start scan"; + // Create iterator. + unique_ptr<CFileSet::Iterator> cfile_iter(fileset->NewIterator(&schema_, nullptr)); + unique_ptr<RowwiseIterator> iter(NewMaterializingIterator(std::move(cfile_iter))); + // Create a scan with a InList predicate on the key column. + auto spec = GetInListScanSpec(col_schema, value_list); + ASSERT_OK(iter->Init(&spec)); + RowBlockMemory mem(1024); + RowBlock block(&schema_, 100, &mem); + selected_size = 0; + while (iter->HasNext()) { + mem.Reset(); + ASSERT_OK_FAST(iter->NextBlock(&block)); + selected_size += block.selection_vector()->CountSelected(); + } + } + sw.stop(); + LOG(INFO) << Substitute( + "Selected $0 rows cost $1 seconds.", selected_size, sw.elapsed().user_cpu_seconds()); + } +}; + +TEST_F(InListPredicateBenchmark, PredicateOnFirstColumn) { + // Test an "IN" predicate on the first column which could be optimized to a "=" predicate. + vector<int> value_list; + value_list.reserve(100); + for (int i = 0; i < 100; i++) { + value_list.emplace_back(i); + } + BenchmarkInListScan(schema_.column(0), value_list); +} + +TEST_F(InListPredicateBenchmark, PredicateOnSecondColumn) { + // Test an "IN" predicate on the second column which could be optimized to skip unnecessary rows. + vector<int> value_list; + value_list.reserve(100); + for (int i = 100; i < 200; i++) { + value_list.emplace_back(i * 10); + } + BenchmarkInListScan(schema_.column(1), value_list); +} + } // namespace tablet } // namespace kudu diff --git a/src/kudu/tablet/cfile_set.cc b/src/kudu/tablet/cfile_set.cc index 6a121588e..e145edbae 100644 --- a/src/kudu/tablet/cfile_set.cc +++ b/src/kudu/tablet/cfile_set.cc @@ -390,6 +390,7 @@ Status CFileSet::Iterator::Init(ScanSpec *spec) { CHECK(!initted_); RETURN_NOT_OK(base_data_->CountRows(io_context_, &row_count_)); + CHECK_GT(row_count_, 0); // Setup key iterator. RETURN_NOT_OK(base_data_->NewKeyIterator(io_context_, &key_iter_)); @@ -397,9 +398,17 @@ Status CFileSet::Iterator::Init(ScanSpec *spec) { // Setup column iterators. RETURN_NOT_OK(CreateColumnIterators(spec)); - // If there is a range predicate on the key column, push that down into an - // ordinal range. - RETURN_NOT_OK(PushdownRangeScanPredicate(spec)); + lower_bound_idx_ = 0; + upper_bound_idx_ = row_count_; + RETURN_NOT_OK(OptimizePKPredicates(spec)); + if (spec != nullptr && spec->CanShortCircuit()) { + lower_bound_idx_ = row_count_; + spec->RemovePredicates(); + } else { + // If there is a range predicate on the key column, push that down into an + // ordinal range. + RETURN_NOT_OK(PushdownRangeScanPredicate(spec)); + } initted_ = true; @@ -410,12 +419,42 @@ Status CFileSet::Iterator::Init(ScanSpec *spec) { return Status::OK(); } -Status CFileSet::Iterator::PushdownRangeScanPredicate(ScanSpec *spec) { - CHECK_GT(row_count_, 0); +Status CFileSet::Iterator::OptimizePKPredicates(ScanSpec* spec) { + if (spec == nullptr) { + // No predicate. + return Status::OK(); + } - lower_bound_idx_ = 0; - upper_bound_idx_ = row_count_; + const EncodedKey* lb_key = spec->lower_bound_key(); + const EncodedKey* ub_key = spec->exclusive_upper_bound_key(); + EncodedKey* implicit_lb_key = nullptr; + EncodedKey* implicit_ub_key = nullptr; + bool modify_lower_bound_key = false; + bool modify_upper_bound_key = false; + const Schema& tablet_schema = *base_data_->tablet_schema(); + + if (!lb_key || lb_key->encoded_key() < base_data_->min_encoded_key_) { + RETURN_NOT_OK(EncodedKey::DecodeEncodedString( + tablet_schema, &arena_, base_data_->min_encoded_key_, &implicit_lb_key)); + spec->SetLowerBoundKey(implicit_lb_key); + modify_lower_bound_key = true; + } + + RETURN_NOT_OK(EncodedKey::DecodeEncodedString( + tablet_schema, &arena_, base_data_->max_encoded_key_, &implicit_ub_key)); + RETURN_NOT_OK(EncodedKey::IncrementEncodedKey(tablet_schema, &implicit_ub_key, &arena_)); + if (!ub_key || ub_key->encoded_key() > implicit_ub_key->encoded_key()) { + spec->SetExclusiveUpperBoundKey(implicit_ub_key); + modify_upper_bound_key = true; + } + + if (modify_lower_bound_key || modify_upper_bound_key) { + spec->UnifyPrimaryKeyBoundsAndColumnPredicates(tablet_schema, &arena_, true); + } + return Status::OK(); +} +Status CFileSet::Iterator::PushdownRangeScanPredicate(ScanSpec* spec) { if (spec == nullptr) { // No predicate. return Status::OK(); diff --git a/src/kudu/tablet/cfile_set.h b/src/kudu/tablet/cfile_set.h index ae21b8232..7a119c4c6 100644 --- a/src/kudu/tablet/cfile_set.h +++ b/src/kudu/tablet/cfile_set.h @@ -38,6 +38,7 @@ #include "kudu/gutil/port.h" #include "kudu/tablet/rowset_metadata.h" #include "kudu/util/make_shared.h" +#include "kudu/util/memory/arena.h" #include "kudu/util/status.h" namespace kudu { @@ -222,18 +223,22 @@ class CFileSet::Iterator : public ColumnwiseIterator { friend class CFileSet; // 'projection' must remain valid for the lifetime of this object. - Iterator(std::shared_ptr<CFileSet const> base_data, const Schema* projection, + Iterator(std::shared_ptr<CFileSet const> base_data, + const Schema* projection, const fs::IOContext* io_context) : base_data_(std::move(base_data)), projection_(projection), initted_(false), cur_idx_(0), prepared_count_(0), - io_context_(io_context) {} + io_context_(io_context), + arena_(256) {} // Fill in col_iters_ for each of the requested columns. Status CreateColumnIterators(const ScanSpec* spec); + Status OptimizePKPredicates(ScanSpec* spec); + // Look for a predicate which can be converted into a range scan using the key // column's index. If such a predicate exists, remove it from the scan spec and // store it in member fields. @@ -276,6 +281,7 @@ class CFileSet::Iterator : public ColumnwiseIterator { // stored in 'col_iters_'. std::vector<cfile::ColumnIterator*> prepared_iters_; + Arena arena_; }; } // namespace tablet
