Predicate evaluation pushdown The premise of this patch is to avoid the excessive use of CPU when evaluating column predicates in specific cases. Dictionary blocks, for instance, can evaluate predicates by checking whether a string's codeword matches with the predicate, rather than by doing string comparisons.
This patch uses a bitset to represent the set of codewords that match a given predicate on dictionary-encoded columns. Certain decoders now have the ability to evaluate a predicate without eagerly copying all of their underlying data into a buffer first. Rather, the decoders can first evaluate the predicate and copy only when needed. Since dictionary encoding relies on plain encoding when a dictionary gets too large, plain encoding also supports decoder-level evaluation. While lacking the benefit from reduced string comparisons, this optimization still improves scan speeds by avoiding excessive copies. See the performance doc for a look into the performance differences for dictionary encoding and plain encoding: https://github.com/anjuwong/kudu/blob/pred-pushdown/docs/decoder-eval-perf.md See the design-doc for predicate-eval-pushdown for a brief overview of the considered implementations: https://github.com/anjuwong/kudu/blob/sorted-dict-block/docs/design-docs/predicate-eval-pushdown.md More in-depth analysis and benchmarking in upcoming blog post. Change-Id: I31e4cce21e99f63b089d7c84410af8ed914cb576 Reviewed-on: http://gerrit.cloudera.org:8080/3990 Tested-by: Kudu Jenkins Reviewed-by: Todd Lipcon <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kudu/repo Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/c0f37278 Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/c0f37278 Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/c0f37278 Branch: refs/heads/master Commit: c0f37278cb09a7781d9073279ea54b08db6e2010 Parents: 211a5e4 Author: Andrew Wong <[email protected]> Authored: Mon Aug 15 13:07:18 2016 -0700 Committer: Todd Lipcon <[email protected]> Committed: Thu Sep 8 15:45:27 2016 +0000 ---------------------------------------------------------------------- src/kudu/cfile/binary_dict_block.cc | 60 +++- src/kudu/cfile/binary_dict_block.h | 8 + src/kudu/cfile/binary_plain_block.cc | 46 ++- src/kudu/cfile/binary_plain_block.h | 12 + src/kudu/cfile/block_encodings.h | 33 +- src/kudu/cfile/cfile-test-base.h | 16 +- src/kudu/cfile/cfile-test.cc | 36 ++- src/kudu/cfile/cfile_reader.cc | 65 ++-- src/kudu/cfile/cfile_reader.h | 36 ++- src/kudu/cfile/cfile_util.cc | 6 +- src/kudu/cfile/encoding-test.cc | 15 + .../common/column_materialization_context.h | 137 +++++++++ src/kudu/common/column_predicate.cc | 51 ++-- src/kudu/common/column_predicate.h | 3 + src/kudu/common/generic_iterators-test.cc | 12 +- src/kudu/common/generic_iterators.cc | 32 +- src/kudu/common/generic_iterators.h | 1 + src/kudu/common/iterator.h | 5 +- src/kudu/common/rowblock.h | 34 +++ src/kudu/common/schema.h | 6 + src/kudu/tablet/CMakeLists.txt | 1 + src/kudu/tablet/cfile_set-test.cc | 12 +- src/kudu/tablet/cfile_set.cc | 35 ++- src/kudu/tablet/cfile_set.h | 4 +- src/kudu/tablet/delta_applier.cc | 16 +- src/kudu/tablet/delta_applier.h | 2 +- src/kudu/tablet/delta_iterator_merger.cc | 9 + src/kudu/tablet/delta_iterator_merger.h | 1 + src/kudu/tablet/delta_store.h | 6 + src/kudu/tablet/deltafile.cc | 16 + src/kudu/tablet/deltafile.h | 1 + src/kudu/tablet/deltamemstore.cc | 12 + src/kudu/tablet/deltamemstore.h | 2 + src/kudu/tablet/tablet-decoder-eval-test.cc | 302 +++++++++++++++++++ src/kudu/tablet/tablet-test-util.h | 23 +- 35 files changed, 931 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/cfile/binary_dict_block.cc ---------------------------------------------------------------------- diff --git a/src/kudu/cfile/binary_dict_block.cc b/src/kudu/cfile/binary_dict_block.cc index 93411b3..478b1cc 100644 --- a/src/kudu/cfile/binary_dict_block.cc +++ b/src/kudu/cfile/binary_dict_block.cc @@ -187,8 +187,9 @@ Status BinaryDictBlockBuilder::GetLastKey(void* key_void) const { BinaryDictBlockDecoder::BinaryDictBlockDecoder(Slice slice, CFileIterator* iter) : data_(std::move(slice)), - parsed_(false) { - dict_decoder_ = iter->GetDictDecoder(); + parsed_(false), + dict_decoder_(iter->GetDictDecoder()), + parent_cfile_iter_(iter) { } Status BinaryDictBlockDecoder::ParseHeader() { @@ -247,6 +248,61 @@ Status BinaryDictBlockDecoder::SeekAtOrAfterValue(const void* value_void, bool* } } +// TODO: implement CopyNextAndEval for more blocks. Eg. other blocks can +// store their min/max values. CopyNextAndEval in these blocks could +// short-circuit if the query can does not search for values within the +// min/max range, or copy all and evaluate otherwise. +Status BinaryDictBlockDecoder::CopyNextAndEval(size_t* n, + ColumnMaterializationContext* ctx, + SelectionVectorView* sel, + ColumnDataView* dst) { + ctx->SetDecoderEvalSupported(); + if (mode_ == kPlainBinaryMode) { + // Copy all strings and evaluate them Slice-by-Slice. + return data_decoder_->CopyNextAndEval(n, ctx, sel, dst); + } + + // Predicates that have no matching words should return no data. + SelectionVector* codewords_matching_pred = parent_cfile_iter_->GetCodeWordsMatchingPredicate(); + if (!codewords_matching_pred->AnySelected()) { + // If nothing is selected, move the data_decoder_ pointer forward and clear + // the corresponding bits in the selection vector. + int skip = static_cast<int>(*n); + data_decoder_->SeekForward(&skip); + *n = static_cast<size_t>(skip); + sel->ClearBits(*n); + return Status::OK(); + } + + // IsNotNull predicates should return all data. + if (ctx->pred()->predicate_type() == PredicateType::IsNotNull) { + return CopyNextDecodeStrings(n, dst); + } + + // Load the rows' codeword values into a buffer for scanning. + BShufBlockDecoder<UINT32>* d_bptr = down_cast<BShufBlockDecoder<UINT32>*>(data_decoder_.get()); + codeword_buf_.resize(*n * sizeof(uint32_t)); + d_bptr->CopyNextValuesToArray(n, codeword_buf_.data()); + Slice* out = reinterpret_cast<Slice*>(dst->data()); + Arena* out_arena = dst->arena(); + for (size_t i = 0; i < *n; i++, out++) { + // Check with the SelectionVectorView to see whether the data has already + // been cleared, in which case we can skip evaluation. + if (!sel->TestBit(i)) { + continue; + } + uint32_t codeword = *reinterpret_cast<uint32_t*>(&codeword_buf_[i*sizeof(uint32_t)]); + if (BitmapTest(codewords_matching_pred->bitmap(), codeword)) { + // Row is included in predicate, copy data to block. + CHECK(out_arena->RelocateSlice(dict_decoder_->string_at_index(codeword), out)); + } else { + // Mark that the row will not be returned. + sel->ClearBit(i); + } + } + return Status::OK(); +} + Status BinaryDictBlockDecoder::CopyNextDecodeStrings(size_t* n, ColumnDataView* dst) { DCHECK(parsed_); CHECK_EQ(dst->type_info()->physical_type(), BINARY); http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/cfile/binary_dict_block.h ---------------------------------------------------------------------- diff --git a/src/kudu/cfile/binary_dict_block.h b/src/kudu/cfile/binary_dict_block.h index 55352ff..cf17204 100644 --- a/src/kudu/cfile/binary_dict_block.h +++ b/src/kudu/cfile/binary_dict_block.h @@ -127,6 +127,10 @@ class BinaryDictBlockDecoder : public BlockDecoder { virtual void SeekToPositionInBlock(uint pos) OVERRIDE; virtual Status SeekAtOrAfterValue(const void* value, bool* exact_match) OVERRIDE; Status CopyNextValues(size_t* n, ColumnDataView* dst) OVERRIDE; + Status CopyNextAndEval(size_t* n, + ColumnMaterializationContext* ctx, + SelectionVectorView* sel, + ColumnDataView* dst) override; virtual bool HasNext() const OVERRIDE { return data_decoder_->HasNext(); @@ -157,6 +161,10 @@ class BinaryDictBlockDecoder : public BlockDecoder { gscoped_ptr<BlockDecoder> data_decoder_; + // Parent CFileIterator, each dictionary decoder in the same CFile will share + // the same vocabulary, and thus, the same set of matching codewords. + CFileIterator* parent_cfile_iter_; + DictEncodingMode mode_; // buffer to hold the codewords, needed by CopyNextDecodeStrings() http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/cfile/binary_plain_block.cc ---------------------------------------------------------------------- diff --git a/src/kudu/cfile/binary_plain_block.cc b/src/kudu/cfile/binary_plain_block.cc index 6866b53..d1b4129 100644 --- a/src/kudu/cfile/binary_plain_block.cc +++ b/src/kudu/cfile/binary_plain_block.cc @@ -17,12 +17,13 @@ #include "kudu/cfile/binary_plain_block.h" -#include <glog/logging.h> #include <algorithm> +#include <glog/logging.h> #include "kudu/cfile/cfile_util.h" #include "kudu/cfile/cfile_writer.h" #include "kudu/common/columnblock.h" +#include "kudu/common/rowblock.h" #include "kudu/gutil/stringprintf.h" #include "kudu/util/coding.h" #include "kudu/util/coding-inl.h" @@ -270,36 +271,49 @@ Status BinaryPlainBlockDecoder::SeekAtOrAfterValue(const void *value_void, bool return Status::OK(); } -Status BinaryPlainBlockDecoder::CopyNextValues(size_t *n, ColumnDataView *dst) { +template <typename CellHandler> +Status BinaryPlainBlockDecoder::HandleBatch(size_t* n, ColumnDataView* dst, CellHandler c) { DCHECK(parsed_); CHECK_EQ(dst->type_info()->physical_type(), BINARY); DCHECK_LE(*n, dst->nrows()); DCHECK_EQ(dst->stride(), sizeof(Slice)); - Arena *out_arena = dst->arena(); if (PREDICT_FALSE(*n == 0 || cur_idx_ >= num_elems_)) { *n = 0; return Status::OK(); } - size_t max_fetch = std::min(*n, static_cast<size_t>(num_elems_ - cur_idx_)); - Slice *out = reinterpret_cast<Slice *>(dst->data()); - size_t i; - for (i = 0; i < max_fetch; i++) { + Slice *out = reinterpret_cast<Slice*>(dst->data()); + for (size_t i = 0; i < max_fetch; i++, out++, cur_idx_++) { Slice elem(string_at_index(cur_idx_)); - - // TODO: in a lot of cases, we might be able to get away with the decoder - // owning it and not truly copying. But, we should extend the CopyNextValues - // API so that the caller can specify if they truly _need_ copies or not. - CHECK(out_arena->RelocateSlice(elem, out)); - out++; - cur_idx_++; + c(i, elem, out, out_arena); } - - *n = i; + *n = max_fetch; return Status::OK(); } +Status BinaryPlainBlockDecoder::CopyNextValues(size_t* n, ColumnDataView* dst) { + return HandleBatch(n, dst, [&](size_t i, Slice elem, Slice* out, Arena* out_arena) { + CHECK(out_arena->RelocateSlice(elem, out)); + }); +} +Status BinaryPlainBlockDecoder::CopyNextAndEval(size_t* n, + ColumnMaterializationContext* ctx, + SelectionVectorView* sel, + ColumnDataView* dst) { + ctx->SetDecoderEvalSupported(); + return HandleBatch(n, dst, [&](size_t i, Slice elem, Slice* out, Arena* out_arena) { + if (!sel->TestBit(i)) { + return; + } else if (ctx->pred()->EvaluateCell(static_cast<const void*>(&elem))) { + CHECK(out_arena->RelocateSlice(elem, out)); + } else { + sel->ClearBit(i); + } + }); +} + + } // namespace cfile } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/cfile/binary_plain_block.h ---------------------------------------------------------------------- diff --git a/src/kudu/cfile/binary_plain_block.h b/src/kudu/cfile/binary_plain_block.h index 1327f92..5b64a18 100644 --- a/src/kudu/cfile/binary_plain_block.h +++ b/src/kudu/cfile/binary_plain_block.h @@ -97,6 +97,10 @@ class BinaryPlainBlockDecoder : public BlockDecoder { virtual Status SeekAtOrAfterValue(const void *value, bool *exact_match) OVERRIDE; Status CopyNextValues(size_t *n, ColumnDataView *dst) OVERRIDE; + Status CopyNextAndEval(size_t* n, + ColumnMaterializationContext* ctx, + SelectionVectorView* sel, + ColumnDataView* dst) override; virtual bool HasNext() const OVERRIDE { DCHECK(parsed_); @@ -127,6 +131,14 @@ class BinaryPlainBlockDecoder : public BlockDecoder { static const size_t kMinHeaderSize = sizeof(uint32_t) * 3; private: + // Helper template for handling batches of rows. CellHandler is a lambda that + // gets called on every cell. When decoder evaluation is enabled, it + // evaluates whether or not the string should be copied and sets a + // SelectionVectorView bit at the appropriate location. When decoder + // evaluation is disabled, it copies the cell's string to dst. + template <typename CellHandler> + Status HandleBatch(size_t* n, ColumnDataView* dst, CellHandler c); + Slice data_; bool parsed_; http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/cfile/block_encodings.h ---------------------------------------------------------------------- diff --git a/src/kudu/cfile/block_encodings.h b/src/kudu/cfile/block_encodings.h index 76649cb..9abd5db 100644 --- a/src/kudu/cfile/block_encodings.h +++ b/src/kudu/cfile/block_encodings.h @@ -18,11 +18,13 @@ #ifndef KUDU_CFILE_BLOCK_ENCODINGS_H #define KUDU_CFILE_BLOCK_ENCODINGS_H +#include <algorithm> #include <stdint.h> - #include <glog/logging.h> +#include "kudu/common/column_materialization_context.h" #include "kudu/common/rowid.h" +#include "kudu/common/rowblock.h" #include "kudu/cfile/cfile.pb.h" #include "kudu/gutil/macros.h" #include "kudu/util/faststring.h" @@ -128,6 +130,18 @@ class BlockDecoder { virtual Status SeekAtOrAfterValue(const void *value, bool *exact_match) = 0; + // Seek the decoder forward by a given number of rows, or to the end + // of the block. This is primarily used to skip over data. + // + // If *n would move the index past the end of the block, set *n to the + // number of rows to get to the end. + virtual void SeekForward(int* n) { + DCHECK(HasNext()); + *n = std::min(*n, static_cast<int>(Count() - GetCurrentIndex())); + DCHECK_GE(*n, 0); + SeekToPositionInBlock(GetCurrentIndex() + *n); + } + // Fetch the next set of values from the block into 'dst'. // The output block must have space for up to n cells. // @@ -138,6 +152,23 @@ class BlockDecoder { // allocated in the dst block's arena. virtual Status CopyNextValues(size_t *n, ColumnDataView *dst) = 0; + // Fetch the next values from the block and evaluate whether they satisfy + // the predicate. Mark the row in the view into the selection vector. This + // view denotes the current location in the CFile. + // + // Modifies *n to contain the number of values fetched. + // + // POSTCONDITION: ctx->decoder_eval_supported_ is not kNotSet. State must + // be consistent throughout the entire column. + virtual Status CopyNextAndEval(size_t* n, + ColumnMaterializationContext* ctx, + SelectionVectorView* sel, + ColumnDataView* dst) { + RETURN_NOT_OK(CopyNextValues(n, dst)); + ctx->SetDecoderEvalNotSupported(); + return Status::OK(); + } + // Return true if there are more values remaining to be iterated. // (i.e that the next call to CopyNextValues will return at least 1 // element) http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/cfile/cfile-test-base.h ---------------------------------------------------------------------- diff --git a/src/kudu/cfile/cfile-test-base.h b/src/kudu/cfile/cfile-test-base.h index 7213bfa..000c741 100644 --- a/src/kudu/cfile/cfile-test-base.h +++ b/src/kudu/cfile/cfile-test-base.h @@ -302,6 +302,11 @@ class CFileTestBase : public KuduTest { ASSERT_OK(fs_manager_->Open()); } + // Place a ColumnBlock and SelectionVector into a context. This context will + // not support decoder evaluation. + ColumnMaterializationContext CreateNonDecoderEvalContext(ColumnBlock* cb, SelectionVector* sel) { + return ColumnMaterializationContext(0, nullptr, cb, sel); + } protected: enum Flags { NO_FLAGS = 0, @@ -390,11 +395,13 @@ SumType FastSum(const Indexable &data, size_t n) { template<DataType Type, typename SumType> void TimeReadFileForDataType(gscoped_ptr<CFileIterator> &iter, int &count) { ScopedColumnBlock<Type> cb(8192); - + SelectionVector sel(cb.nrows()); + ColumnMaterializationContext ctx(0, nullptr, &cb, &sel); + ctx.SetDecoderEvalNotSupported(); SumType sum = 0; while (iter->HasNext()) { size_t n = cb.nrows(); - ASSERT_OK_FAST(iter->CopyNextValues(&n, &cb)); + ASSERT_OK_FAST(iter->CopyNextValues(&n, &ctx)); sum += FastSum<ScopedColumnBlock<Type>, SumType>(cb, n); count += n; cb.arena()->Reset(); @@ -406,10 +413,13 @@ void TimeReadFileForDataType(gscoped_ptr<CFileIterator> &iter, int &count) { template<DataType Type> void ReadBinaryFile(CFileIterator* iter, int* count) { ScopedColumnBlock<Type> cb(100); + SelectionVector sel(cb.nrows()); + ColumnMaterializationContext ctx(0, nullptr, &cb, &sel); + ctx.SetDecoderEvalNotSupported(); uint64_t sum_lens = 0; while (iter->HasNext()) { size_t n = cb.nrows(); - ASSERT_OK_FAST(iter->CopyNextValues(&n, &cb)); + ASSERT_OK_FAST(iter->CopyNextValues(&n, &ctx)); for (int i = 0; i < n; i++) { sum_lens += cb[i].size(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/cfile/cfile-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/cfile/cfile-test.cc b/src/kudu/cfile/cfile-test.cc index 0e33283..50d7846 100644 --- a/src/kudu/cfile/cfile-test.cc +++ b/src/kudu/cfile/cfile-test.cc @@ -70,14 +70,13 @@ class TestCFile : public CFileTestBase { ASSERT_OK(CFileReader::Open(std::move(block), ReaderOptions(), &reader)); BlockPointer ptr; - gscoped_ptr<CFileIterator> iter; ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK)); ASSERT_OK(iter->SeekToOrdinal(5000)); ASSERT_EQ(5000u, iter->GetCurrentOrdinal()); - // Seek to last key exactly, should succeed + // Seek to last key exactly, should succeed. ASSERT_OK(iter->SeekToOrdinal(9999)); ASSERT_EQ(9999u, iter->GetCurrentOrdinal()); @@ -91,7 +90,9 @@ class TestCFile : public CFileTestBase { // Fetch all data. ScopedColumnBlock<DataGeneratorType::kDataType> out(10000); size_t n = 10000; - ASSERT_OK(iter->CopyNextValues(&n, &out)); + SelectionVector sel(10000); + ColumnMaterializationContext out_ctx = CreateNonDecoderEvalContext(&out, &sel); + ASSERT_OK(iter->CopyNextValues(&n, &out_ctx)); ASSERT_EQ(10000, n); DataGeneratorType data_generator_pre; @@ -117,10 +118,11 @@ class TestCFile : public CFileTestBase { ColumnBlock advancing_block(out.type_info(), nullptr, out.data() + (fetched * out.stride()), out.nrows() - fetched, out.arena()); + ColumnMaterializationContext adv_ctx = CreateNonDecoderEvalContext(&advancing_block, &sel); ASSERT_TRUE(iter->HasNext()); size_t batch_size = random() % 5 + 1; size_t n = batch_size; - ASSERT_OK(iter->CopyNextValues(&n, &advancing_block)); + ASSERT_OK(iter->CopyNextValues(&n, &adv_ctx)); ASSERT_LE(n, batch_size); fetched += n; } @@ -157,6 +159,8 @@ class TestCFile : public CFileTestBase { Arena arena(8192, 8*1024*1024); ScopedColumnBlock<DataGeneratorType::kDataType> cb(10); + SelectionVector sel(10); + ColumnMaterializationContext ctx = CreateNonDecoderEvalContext(&cb, &sel); const int kNumLoops = AllowSlowTests() ? num_entries : 10; for (int loop = 0; loop < kNumLoops; loop++) { // Seek to a random point in the file, @@ -170,7 +174,7 @@ class TestCFile : public CFileTestBase { int read_offset = target; for (int block = 0; block < 3 && iter->HasNext(); block++) { size_t n = cb.nrows(); - ASSERT_OK_FAST(iter->CopyNextValues(&n, &cb)); + ASSERT_OK_FAST(iter->CopyNextValues(&n, &ctx)); ASSERT_EQ(n, std::min(num_entries - read_offset, cb.nrows())); // Verify that the block data is correct. @@ -332,8 +336,11 @@ void CopyOne(CFileIterator *it, typename TypeTraits<type>::cpp_type *ret, Arena *arena) { ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, arena); + SelectionVector sel(1); + ColumnMaterializationContext ctx(0, nullptr, &cb, &sel); + ctx.SetDecoderEvalNotSupported(); size_t n = 1; - ASSERT_OK(it->CopyNextValues(&n, &cb)); + ASSERT_OK(it->CopyNextValues(&n, &ctx)); ASSERT_EQ(1, n); } @@ -585,11 +592,13 @@ void TestCFile::TestReadWriteStrings(EncodingType encoding, // Reseek to start and fetch all data. // We fetch in 10 smaller chunks to avoid using too much RAM for the // case where the values are large. + SelectionVector sel(10000); ASSERT_OK(iter->SeekToFirst()); for (int i = 0; i < 10; i++) { ScopedColumnBlock<STRING> cb(10000); + ColumnMaterializationContext cb_ctx = CreateNonDecoderEvalContext(&cb, &sel); size_t n = 1000; - ASSERT_OK(iter->CopyNextValues(&n, &cb)); + ASSERT_OK(iter->CopyNextValues(&n, &cb_ctx)); ASSERT_EQ(1000, n); } } @@ -678,7 +687,9 @@ TEST_P(TestCFileBothCacheTypes, TestDefaultColumnIter) { uint32_t int_value = 15; DefaultColumnValueIterator iter(GetTypeInfo(UINT32), &int_value); ColumnBlock int_col(GetTypeInfo(UINT32), nullptr, data, kNumItems, nullptr); - ASSERT_OK(iter.Scan(&int_col)); + SelectionVector sel(kNumItems); + ColumnMaterializationContext int_ctx = CreateNonDecoderEvalContext(&int_col, &sel); + ASSERT_OK(iter.Scan(&int_ctx)); for (size_t i = 0; i < int_col.nrows(); ++i) { ASSERT_EQ(int_value, *reinterpret_cast<const uint32_t *>(int_col.cell_ptr(i))); } @@ -687,7 +698,8 @@ TEST_P(TestCFileBothCacheTypes, TestDefaultColumnIter) { int_value = 321; DefaultColumnValueIterator nullable_iter(GetTypeInfo(UINT32), &int_value); ColumnBlock nullable_col(GetTypeInfo(UINT32), null_bitmap, data, kNumItems, nullptr); - ASSERT_OK(nullable_iter.Scan(&nullable_col)); + ColumnMaterializationContext nullable_ctx = CreateNonDecoderEvalContext(&nullable_col, &sel); + ASSERT_OK(nullable_iter.Scan(&nullable_ctx)); for (size_t i = 0; i < nullable_col.nrows(); ++i) { ASSERT_FALSE(nullable_col.is_null(i)); ASSERT_EQ(int_value, *reinterpret_cast<const uint32_t *>(nullable_col.cell_ptr(i))); @@ -696,7 +708,8 @@ TEST_P(TestCFileBothCacheTypes, TestDefaultColumnIter) { // Test NULL Default Value DefaultColumnValueIterator null_iter(GetTypeInfo(UINT32), nullptr); ColumnBlock null_col(GetTypeInfo(UINT32), null_bitmap, data, kNumItems, nullptr); - ASSERT_OK(null_iter.Scan(&null_col)); + ColumnMaterializationContext null_ctx = CreateNonDecoderEvalContext(&null_col, &sel); + ASSERT_OK(null_iter.Scan(&null_ctx)); for (size_t i = 0; i < null_col.nrows(); ++i) { ASSERT_TRUE(null_col.is_null(i)); } @@ -707,7 +720,8 @@ TEST_P(TestCFileBothCacheTypes, TestDefaultColumnIter) { Arena arena(32*1024, 256*1024); DefaultColumnValueIterator str_iter(GetTypeInfo(STRING), &str_value); ColumnBlock str_col(GetTypeInfo(STRING), nullptr, str_data, kNumItems, &arena); - ASSERT_OK(str_iter.Scan(&str_col)); + ColumnMaterializationContext str_ctx = CreateNonDecoderEvalContext(&str_col, &sel); + ASSERT_OK(str_iter.Scan(&str_ctx)); for (size_t i = 0; i < str_col.nrows(); ++i) { ASSERT_EQ(str_value, *reinterpret_cast<const Slice *>(str_col.cell_ptr(i))); } http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/cfile/cfile_reader.cc ---------------------------------------------------------------------- diff --git a/src/kudu/cfile/cfile_reader.cc b/src/kudu/cfile/cfile_reader.cc index 0644f1c..420596b 100644 --- a/src/kudu/cfile/cfile_reader.cc +++ b/src/kudu/cfile/cfile_reader.cc @@ -469,6 +469,7 @@ size_t CFileReader::memory_footprint() const { //////////////////////////////////////////////////////////// // Default Column Value Iterator //////////////////////////////////////////////////////////// + Status DefaultColumnValueIterator::SeekToOrdinal(rowid_t ord_idx) { ordinal_ = ord_idx; return Status::OK(); @@ -479,7 +480,8 @@ Status DefaultColumnValueIterator::PrepareBatch(size_t *n) { return Status::OK(); } -Status DefaultColumnValueIterator::Scan(ColumnBlock *dst) { +Status DefaultColumnValueIterator::Scan(ColumnMaterializationContext* ctx) { + ColumnBlock* dst = ctx->block(); if (dst->is_nullable()) { ColumnDataView dst_view(dst); dst_view.SetNullBits(dst->nrows(), value_ != nullptr); @@ -616,8 +618,6 @@ Status CFileIterator::SeekToFirst() { return Status::OK(); } - - Status CFileIterator::SeekAtOrAfter(const EncodedKey &key, bool *exact_match) { RETURN_NOT_OK(PrepareForNewSeek()); @@ -893,16 +893,28 @@ Status CFileIterator::FinishBatch() { return Status::OK(); } - -Status CFileIterator::Scan(ColumnBlock *dst) { +Status CFileIterator::Scan(ColumnMaterializationContext* ctx) { CHECK(seeked_) << "not seeked"; - // Use a column data view to been able to advance it as we read into it. - ColumnDataView remaining_dst(dst); - + // Use views to advance the block and selection vector as we read into them. + ColumnDataView remaining_dst(ctx->block()); + SelectionVectorView remaining_sel(ctx->sel()); uint32_t rem = last_prepare_count_; - DCHECK_LE(rem, dst->nrows()); - + DCHECK_LE(rem, ctx->block()->nrows()); + + // Determine the matching codewords for dictionary encoding if they haven't + // yet been determined for this CFile. + if (dict_decoder_ && ctx->DecoderEvalNotDisabled() && !codewords_matching_pred_) { + size_t nwords = dict_decoder_->Count(); + codewords_matching_pred_.reset(new SelectionVector(nwords)); + codewords_matching_pred_->SetAllFalse(); + for (size_t i = 0; i < nwords; i++) { + Slice cur_string = dict_decoder_->string_at_index(i); + if (ctx->pred()->EvaluateCell(static_cast<const void *>(&cur_string))) { + BitmapSet(codewords_matching_pred_->mutable_bitmap(), i); + } + } + } for (PreparedBlock *pb : prepared_blocks_) { if (pb->needs_rewind_) { // Seek back to the saved position. @@ -911,12 +923,10 @@ Status CFileIterator::Scan(ColumnBlock *dst) { // that might be more efficient (allowing the decoder to save internal state // instead of having to reconstruct it) } - if (reader_->is_nullable()) { - DCHECK(dst->is_nullable()); + DCHECK(ctx->block()->is_nullable()); size_t nrows = std::min(rem, pb->num_rows_in_block_ - pb->idx_in_block_); - // Fill column bitmap size_t count = nrows; while (count > 0) { @@ -928,11 +938,17 @@ Status CFileIterator::Scan(ColumnBlock *dst) { Substitute("Unexpected EOF on NULL bitmap read. Expected at least $0 more rows", count)); } - size_t this_batch = nblock; if (not_null) { // TODO: Maybe copy all and shift later? - RETURN_NOT_OK(pb->dblk_->CopyNextValues(&this_batch, &remaining_dst)); + if (ctx->DecoderEvalNotDisabled()) { + RETURN_NOT_OK(pb->dblk_->CopyNextAndEval(&this_batch, + ctx, + &remaining_sel, + &remaining_dst)); + } else { + RETURN_NOT_OK(pb->dblk_->CopyNextValues(&this_batch, &remaining_dst)); + } DCHECK_EQ(nblock, this_batch); pb->needs_rewind_ = true; } else { @@ -941,6 +957,9 @@ Status CFileIterator::Scan(ColumnBlock *dst) { remaining_dst.stride() * nblock, "NULLNULLNULLNULLNULL"); #endif + if (ctx->DecoderEvalNotDisabled()) { + remaining_sel.ClearBits(this_batch); + } } // Set the ColumnBlock bitmap @@ -950,22 +969,29 @@ Status CFileIterator::Scan(ColumnBlock *dst) { count -= this_batch; pb->idx_in_block_ += this_batch; remaining_dst.Advance(this_batch); + remaining_sel.Advance(this_batch); } } else { // Fetch as many as we can from the current datablock. size_t this_batch = rem; - RETURN_NOT_OK(pb->dblk_->CopyNextValues(&this_batch, &remaining_dst)); + + if (ctx->DecoderEvalNotDisabled()) { + RETURN_NOT_OK(pb->dblk_->CopyNextAndEval(&this_batch, ctx, &remaining_sel, &remaining_dst)); + } else { + RETURN_NOT_OK(pb->dblk_->CopyNextValues(&this_batch, &remaining_dst)); + } pb->needs_rewind_ = true; DCHECK_LE(this_batch, rem); // If the column is nullable, set all bits to true - if (dst->is_nullable()) { + if (ctx->block()->is_nullable()) { remaining_dst.SetNullBits(this_batch, true); } rem -= this_batch; pb->idx_in_block_ += this_batch; remaining_dst.Advance(this_batch); + remaining_sel.Advance(this_batch); } // If we didn't fetch as many as requested, then it should @@ -982,13 +1008,12 @@ Status CFileIterator::Scan(ColumnBlock *dst) { return Status::OK(); } -Status CFileIterator::CopyNextValues(size_t *n, ColumnBlock *cb) { +Status CFileIterator::CopyNextValues(size_t* n, ColumnMaterializationContext* ctx) { RETURN_NOT_OK(PrepareBatch(n)); - RETURN_NOT_OK(Scan(cb)); + RETURN_NOT_OK(Scan(ctx)); RETURN_NOT_OK(FinishBatch()); return Status::OK(); } - } // namespace cfile } // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/cfile/cfile_reader.h ---------------------------------------------------------------------- diff --git a/src/kudu/cfile/cfile_reader.h b/src/kudu/cfile/cfile_reader.h index f1db145..40106d3 100644 --- a/src/kudu/cfile/cfile_reader.h +++ b/src/kudu/cfile/cfile_reader.h @@ -239,11 +239,13 @@ class ColumnIterator { virtual Status PrepareBatch(size_t *n) = 0; // Copy values into the prepared column block. - // Any indirected values (eg strings) are copied into the dst block's + // Any indirected values (eg strings) are copied into the ctx's block's // arena. // This does _not_ advance the position in the underlying file. Multiple // calls to Scan() will re-read the same values. - virtual Status Scan(ColumnBlock *dst) = 0; + // If decoder eval is supported and allowed, will additionally evaluate the + // column predicate. + virtual Status Scan(ColumnMaterializationContext* ctx) = 0; // Finish processing the current batch, advancing the iterators // such that the next call to PrepareBatch() will start where the previous @@ -266,14 +268,14 @@ class DefaultColumnValueIterator : public ColumnIterator { : typeinfo_(typeinfo), value_(value), ordinal_(0) { } - Status SeekToOrdinal(rowid_t ord_idx) OVERRIDE; + Status SeekToOrdinal(rowid_t ord_idx) override; bool seeked() const OVERRIDE { return true; } rowid_t GetCurrentOrdinal() const OVERRIDE { return ordinal_; } - Status PrepareBatch(size_t *n) OVERRIDE; - Status Scan(ColumnBlock *dst) OVERRIDE; + Status PrepareBatch(size_t* n) OVERRIDE; + Status Scan(ColumnMaterializationContext* ctx) override; Status FinishBatch() OVERRIDE; const IteratorStats& io_statistics() const OVERRIDE { return io_stats_; } @@ -303,7 +305,7 @@ class CFileIterator : public ColumnIterator { // If provided seek point is past the end of the file, // then returns a NotFound Status. // TODO: do we ever want to be able to seek to the end of the file? - Status SeekToOrdinal(rowid_t ord_idx) OVERRIDE; + Status SeekToOrdinal(rowid_t ord_idx) override; // Seek the index to the given row_key, or to the index entry immediately // before it. Then (if the index is sparse) seek the data block to the @@ -346,7 +348,7 @@ class CFileIterator : public ColumnIterator { // arena. // This does _not_ advance the position in the underlying file. Multiple // calls to Scan() will re-read the same values. - Status Scan(ColumnBlock *dst) OVERRIDE; + Status Scan(ColumnMaterializationContext* ctx) override; // Finish processing the current batch, advancing the iterators // such that the next call to PrepareBatch() will start where the previous @@ -357,16 +359,23 @@ class CFileIterator : public ColumnIterator { bool HasNext() const; // Convenience method to prepare a batch, scan it, and finish it. - Status CopyNextValues(size_t *n, ColumnBlock *dst); + Status CopyNextValues(size_t* n, ColumnMaterializationContext* ctx); const IteratorStats &io_statistics() const OVERRIDE { return io_stats_; } - // It the column is dictionary-coded, returns the decoder + // If the column is dictionary-coded, returns the decoder // for the cfile's dictionary block. This is called by the - // StringDictBlockDecoder. - BinaryPlainBlockDecoder* GetDictDecoder() { return dict_decoder_.get();} + // BinaryDictBlockDecoder. + BinaryPlainBlockDecoder* GetDictDecoder() { return dict_decoder_.get(); } + + // If the column is dictionary-coded and a predicate on the column exists, + // returns the set of codewords that pass the predicate. Since a vocabulary + // is shared among the multiple BinaryDictBlockDecoders in a single cfile, + // the reader must expose an interface for all decoders to access the + // single set of predicate-satisfying codewords. + SelectionVector* GetCodeWordsMatchingPredicate() { return codewords_matching_pred_.get(); } private: DISALLOW_COPY_AND_ASSIGN(CFileIterator); @@ -434,10 +443,13 @@ class CFileIterator : public ColumnIterator { gscoped_ptr<IndexTreeIterator> posidx_iter_; gscoped_ptr<IndexTreeIterator> validx_iter_; - // Decoder for the dictionary block + // Decoder for the dictionary block. gscoped_ptr<BinaryPlainBlockDecoder> dict_decoder_; BlockHandle dict_block_handle_; + // Set containing the codewords that match the predicate in a dictionary. + std::unique_ptr<SelectionVector> codewords_matching_pred_; + // The currently in-use index iterator. This is equal to either // posidx_iter_.get(), validx_iter_.get(), or NULL if not seeked. IndexTreeIterator *seeked_; http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/cfile/cfile_util.cc ---------------------------------------------------------------------- diff --git a/src/kudu/cfile/cfile_util.cc b/src/kudu/cfile/cfile_util.cc index 393e45c..06c7813 100644 --- a/src/kudu/cfile/cfile_util.cc +++ b/src/kudu/cfile/cfile_util.cc @@ -22,6 +22,7 @@ #include <string> #include "kudu/cfile/cfile_reader.h" +#include "kudu/common/column_materialization_context.h" #include "kudu/util/env.h" #include "kudu/util/mem_tracker.h" @@ -44,14 +45,15 @@ Status DumpIterator(const CFileReader& reader, size_t max_rows = kBufSize/type->size(); uint8_t nulls[BitmapSize(max_rows)]; ColumnBlock cb(type, reader.is_nullable() ? nulls : nullptr, buf, max_rows, &arena); - + SelectionVector sel(max_rows); + ColumnMaterializationContext ctx(0, nullptr, &cb, &sel); string strbuf; size_t count = 0; while (it->HasNext()) { size_t n = num_rows == 0 ? max_rows : std::min(max_rows, num_rows - count); if (n == 0) break; - RETURN_NOT_OK(it->CopyNextValues(&n, &cb)); + RETURN_NOT_OK(it->CopyNextValues(&n, &ctx)); if (reader.is_nullable()) { for (size_t i = 0; i < n; i++) { http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/cfile/encoding-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/cfile/encoding-test.cc b/src/kudu/cfile/encoding-test.cc index 73b2f6d..a3717f6 100644 --- a/src/kudu/cfile/encoding-test.cc +++ b/src/kudu/cfile/encoding-test.cc @@ -524,6 +524,21 @@ class TestEncoding : public ::testing::Test { CopyOne<IntType>(&ibd, &ret); EXPECT_EQ(decoded[seek_off], ret); } + + // Test Seek forward within block. + ibd.SeekToPositionInBlock(0); + int skip_step = 7; + EXPECT_EQ((uint32_t) 0, ibd.GetCurrentIndex()); + for (uint32_t i = 0; i < decoded.size()/skip_step; i++) { + // Skip just before the end of the step. + int skip = skip_step-1; + ibd.SeekForward(&skip); + EXPECT_EQ((uint32_t) i*skip_step+skip, ibd.GetCurrentIndex()); + CppType ret; + // CopyOne will move the decoder forward by one. + CopyOne<IntType>(&ibd, &ret); + EXPECT_EQ(decoded[i*skip_step + skip], ret); + } } http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/common/column_materialization_context.h ---------------------------------------------------------------------- diff --git a/src/kudu/common/column_materialization_context.h b/src/kudu/common/column_materialization_context.h new file mode 100644 index 0000000..7fdfd0c --- /dev/null +++ b/src/kudu/common/column_materialization_context.h @@ -0,0 +1,137 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "kudu/common/column_predicate.h" +#include "kudu/common/columnblock.h" + +namespace kudu { + +// A ColumnMaterializationContext provides a clean interface to the set of +// objects that get passed down to the column iterator and decoders during +// materialization. +// +// Example of path taken: +// MaterializingIterator -> CFileSet::Iterator -> CFileIterator -> BlockDecoder +class ColumnMaterializationContext { + public: + ColumnMaterializationContext(size_t col_idx, + const ColumnPredicate* pred, + ColumnBlock* block, + SelectionVector* sel) + : col_idx_(col_idx), + pred_(pred), + block_(block), + sel_(sel), + decoder_eval_status_(kNotSet) { + if (!pred_ || !sel || !block) { + decoder_eval_status_ = kDecoderEvalNotSupported; + } + } + + // Column index in within the projection schema, not the underlying schema. + const size_t col_idx() { return col_idx_; } + + // Predicate being evaluated. + const ColumnPredicate* pred() { return pred_; } + + // Destination for copied data. + ColumnBlock* block() { return block_; } + + // Selection vector reflecting the result of the predicate evaluation. + // If a given bit is already cleared, that row can be skipped. + // + // Required non-null if used by scans, else there will be a nullptr reference + // when creating a SelectionVectorView around it. Must be large enough to + // cover the number of rows being scanned. + SelectionVector* sel() { return sel_; } + + // Checked after returning from the decoder to determine whether or not the + // block must still be evaluated (on true). + bool DecoderEvalNotSupported() const { + return decoder_eval_status_ == kDecoderEvalNotSupported; + } + + // Checked by CFileIterator::Scan() to determine whether decoder-level + // evaluation should be attempted (on true). + bool DecoderEvalNotDisabled() const { + return decoder_eval_status_ != kDecoderEvalNotSupported; + } + + // A context should not switch from supporting decoder-level eval to not + // supporting it, or vice versa. + // + // Going from NotSupported to Supported may lead to incorrect results if + // previously scanned rows materialize values that should not be in the + // results set. To prevent this, attempts to do this will result in no-ops. + // This will disable decoder-level evaluation and yield correct values. + // + // Should only be called by CopyNextAndEval() of a decoder that supports + // evaluation. + void SetDecoderEvalSupported() { + DCHECK(decoder_eval_status_ != kDecoderEvalNotSupported); + if (decoder_eval_status_ == kNotSet) { + DCHECK(sel_ != nullptr && pred_ != nullptr); + decoder_eval_status_ = kDecoderEvalSupported; + } + } + + // Going from Supported to NotSupported may lead to errors if the scan + // previously evaluated rows that were deemed to be not in the result set. + // During batch evaluation, these rows would be missing from the buffer, + // and this could lead to a memory error. + // + // Should be called before materializing a column if support has manually + // been turned off with materializing_iterator_decoder_eval flag, or by + // CopyNextAndEval() of a decoder that does not support evaluation. + void SetDecoderEvalNotSupported() { + CHECK(decoder_eval_status_ != kDecoderEvalSupported); + decoder_eval_status_ = kDecoderEvalNotSupported; + } + + private: + enum DecoderEvalStatus { + // During scan, will try to evaluate with the decoder, after which the + // correct status will be set. + kNotSet = 0, + + // May be set before scanning if the decoder eval flag is set to false or + // if iterator has deltas associated with it. + // May be set by decoder during scan if decoder eval is not supported. + // Once set, scanning will materialize the entire column into the block, + // leaving evaluation for after the scan. + kDecoderEvalNotSupported, + + // Is set by a decoder during scan if decoder eval is supported. + // Once set, the decoder is contractually bound to return with selection + // vector filled out. + kDecoderEvalSupported + }; + + const size_t col_idx_; + + const ColumnPredicate* pred_; + + ColumnBlock* block_; + + SelectionVector* const sel_; + + DecoderEvalStatus decoder_eval_status_; +}; + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/common/column_predicate.cc ---------------------------------------------------------------------- diff --git a/src/kudu/common/column_predicate.cc b/src/kudu/common/column_predicate.cc index fe3aef4..ab8ef90 100644 --- a/src/kudu/common/column_predicate.cc +++ b/src/kudu/common/column_predicate.cc @@ -265,9 +265,8 @@ void ApplyPredicate(const ColumnBlock& block, SelectionVector* sel, P p) { } } // anonymous namespace -void ColumnPredicate::Evaluate(const ColumnBlock& block, SelectionVector *sel) const { +void ColumnPredicate::Evaluate(const ColumnBlock& block, SelectionVector* sel) const { CHECK_NOTNULL(sel); - // The type-specific predicate is provided as a function template to // ApplyPredicate in the hope that they are inlined. // @@ -280,34 +279,28 @@ void ColumnPredicate::Evaluate(const ColumnBlock& block, SelectionVector *sel) c // TODO: equality predicates should use the bloomfilter if it's available. switch (predicate_type()) { - case PredicateType::None: { - ApplyPredicate(block, sel, [] (const void*) { - return false; - }); - return; - }; case PredicateType::Range: { if (lower_ == nullptr) { ApplyPredicate(block, sel, [this] (const void* cell) { - return column_.type_info()->Compare(cell, this->upper_) < 0; + return column_.type_info()->Compare(cell, this->upper_) < 0; }); } else if (upper_ == nullptr) { ApplyPredicate(block, sel, [this] (const void* cell) { - return column_.type_info()->Compare(cell, this->lower_) >= 0; + return column_.type_info()->Compare(cell, this->lower_) >= 0; }); } else { ApplyPredicate(block, sel, [this] (const void* cell) { - return column_.type_info()->Compare(cell, this->upper_) < 0 && - column_.type_info()->Compare(cell, this->lower_) >= 0; + return column_.type_info()->Compare(cell, this->upper_) < 0 && + column_.type_info()->Compare(cell, this->lower_) >= 0; }); } return; }; case PredicateType::Equality: { - ApplyPredicate(block, sel, [this] (const void* cell) { - return column_.type_info()->Compare(cell, this->lower_) == 0; - }); - return; + ApplyPredicate(block, sel, [this] (const void* cell) { + return column_.type_info()->Compare(cell, this->lower_) == 0; + }); + return; }; case PredicateType::IsNotNull: { if (!block.is_nullable()) return; @@ -320,8 +313,32 @@ void ColumnPredicate::Evaluate(const ColumnBlock& block, SelectionVector *sel) c } return; } + default: + LOG(FATAL) << "unknown predicate type"; + } +} + +bool ColumnPredicate::EvaluateCell(const void* cell) const { + switch (predicate_type()) { + case PredicateType::Range: { + if (lower_ == nullptr) { + return column_.type_info()->Compare(cell, this->upper_) < 0; + } else if (upper_ == nullptr) { + return column_.type_info()->Compare(cell, this->lower_) >= 0; + } else { + return column_.type_info()->Compare(cell, this->upper_) < 0 && + column_.type_info()->Compare(cell, this->lower_) >= 0; + } + }; + case PredicateType::Equality: { + return column_.type_info()->Compare(cell, this->lower_) == 0; + }; + case PredicateType::IsNotNull: { + return true; + }; + default: + LOG(FATAL) << "unknown predicate type"; } - LOG(FATAL) << "unknown predicate type"; } string ColumnPredicate::ToString() const { http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/common/column_predicate.h ---------------------------------------------------------------------- diff --git a/src/kudu/common/column_predicate.h b/src/kudu/common/column_predicate.h index a548317..80b77c8 100644 --- a/src/kudu/common/column_predicate.h +++ b/src/kudu/common/column_predicate.h @@ -138,6 +138,9 @@ class ColumnPredicate { // same vector as block->selection_vector(). void Evaluate(const ColumnBlock& block, SelectionVector* sel) const; + // Evaluate the predicate on a single cell. + bool EvaluateCell(const void *cell) const; + // Print the predicate for debugging. std::string ToString() const; http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/common/generic_iterators-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/common/generic_iterators-test.cc b/src/kudu/common/generic_iterators-test.cc index 12ed7d7..7b16fb2 100644 --- a/src/kudu/common/generic_iterators-test.cc +++ b/src/kudu/common/generic_iterators-test.cc @@ -23,6 +23,7 @@ #include "kudu/common/iterator.h" #include "kudu/common/generic_iterators.h" +#include "kudu/common/column_materialization_context.h" #include "kudu/common/rowblock.h" #include "kudu/common/scan_spec.h" #include "kudu/common/schema.h" @@ -62,7 +63,7 @@ class VectorIterator : public ColumnwiseIterator { return Status::OK(); } - virtual Status PrepareBatch(size_t *nrows) OVERRIDE { + virtual Status PrepareBatch(size_t* nrows) OVERRIDE { prepared_ = std::min<int64_t>({ static_cast<int64_t>(ints_.size()) - cur_idx_, block_size_, @@ -76,12 +77,13 @@ class VectorIterator : public ColumnwiseIterator { return Status::OK(); } - virtual Status MaterializeColumn(size_t col, ColumnBlock *dst) OVERRIDE { - CHECK_EQ(UINT32, dst->type_info()->physical_type()); - DCHECK_LE(prepared_, dst->nrows()); + Status MaterializeColumn(ColumnMaterializationContext* ctx) override { + ctx->SetDecoderEvalNotSupported(); + CHECK_EQ(UINT32, ctx->block()->type_info()->physical_type()); + DCHECK_LE(prepared_, ctx->block()->nrows()); for (size_t i = 0; i < prepared_; i++) { - dst->SetCellValue(i, &(ints_[cur_idx_++])); + ctx->block()->SetCellValue(i, &(ints_[cur_idx_++])); } return Status::OK(); http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/common/generic_iterators.cc ---------------------------------------------------------------------- diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc index 1d54e0c..e431510 100644 --- a/src/kudu/common/generic_iterators.cc +++ b/src/kudu/common/generic_iterators.cc @@ -21,6 +21,8 @@ #include <tuple> #include <utility> +#include "kudu/common/column_materialization_context.h" +#include "kudu/common/column_predicate.h" #include "kudu/common/generic_iterators.h" #include "kudu/common/row.h" #include "kudu/common/rowblock.h" @@ -43,6 +45,10 @@ using std::tuple; DEFINE_bool(materializing_iterator_do_pushdown, true, "Should MaterializingIterator do predicate pushdown"); TAG_FLAG(materializing_iterator_do_pushdown, hidden); +DEFINE_bool(materializing_iterator_decoder_eval, true, + "Should MaterializingIterator do decoder-level evaluation"); +TAG_FLAG(materializing_iterator_decoder_eval, hidden); +TAG_FLAG(materializing_iterator_decoder_eval, runtime); namespace kudu { @@ -436,7 +442,8 @@ void UnionIterator::GetIteratorStats(std::vector<IteratorStats>* stats) const { MaterializingIterator::MaterializingIterator(shared_ptr<ColumnwiseIterator> iter) : iter_(move(iter)), - disallow_pushdown_for_tests_(!FLAGS_materializing_iterator_do_pushdown) { + disallow_pushdown_for_tests_(!FLAGS_materializing_iterator_do_pushdown), + disallow_decoder_eval_(!FLAGS_materializing_iterator_decoder_eval) { } Status MaterializingIterator::Init(ScanSpec *spec) { @@ -512,10 +519,19 @@ Status MaterializingIterator::MaterializeBlock(RowBlock *dst) { for (const auto& col_pred : col_idx_predicates_) { // Materialize the column itself into the row block. ColumnBlock dst_col(dst->column_block(get<0>(col_pred))); - RETURN_NOT_OK(iter_->MaterializeColumn(get<0>(col_pred), &dst_col)); - - // Evaluate the column predicate. - get<1>(col_pred).Evaluate(dst_col, dst->selection_vector()); + ColumnMaterializationContext ctx(get<0>(col_pred), + &get<1>(col_pred), + &dst_col, + dst->selection_vector()); + // None predicates should be short-circuited in scan spec. + DCHECK(ctx.pred()->predicate_type() != PredicateType::None); + if (disallow_decoder_eval_) { + ctx.SetDecoderEvalNotSupported(); + } + RETURN_NOT_OK(iter_->MaterializeColumn(&ctx)); + if (ctx.DecoderEvalNotSupported()) { + get<1>(col_pred).Evaluate(dst_col, dst->selection_vector()); + } // If after evaluating this predicate the entire row block has been filtered // out, we don't need to materialize other columns at all. @@ -528,7 +544,11 @@ Status MaterializingIterator::MaterializeBlock(RowBlock *dst) { for (size_t col_idx : non_predicate_column_indexes_) { // Materialize the column itself into the row block. ColumnBlock dst_col(dst->column_block(col_idx)); - RETURN_NOT_OK(iter_->MaterializeColumn(col_idx, &dst_col)); + ColumnMaterializationContext ctx(col_idx, + nullptr, + &dst_col, + dst->selection_vector()); + RETURN_NOT_OK(iter_->MaterializeColumn(&ctx)); } DVLOG(1) << dst->selection_vector()->CountSelected() << "/" http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/common/generic_iterators.h ---------------------------------------------------------------------- diff --git a/src/kudu/common/generic_iterators.h b/src/kudu/common/generic_iterators.h index 3356a6f..d872aa7 100644 --- a/src/kudu/common/generic_iterators.h +++ b/src/kudu/common/generic_iterators.h @@ -175,6 +175,7 @@ class MaterializingIterator : public RowwiseIterator { // Set only by test code to disallow pushdown. bool disallow_pushdown_for_tests_; + bool disallow_decoder_eval_; }; // An iterator which wraps another iterator and evaluates any predicates that the http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/common/iterator.h ---------------------------------------------------------------------- diff --git a/src/kudu/common/iterator.h b/src/kudu/common/iterator.h index ac7df92..0fb3cc7 100644 --- a/src/kudu/common/iterator.h +++ b/src/kudu/common/iterator.h @@ -30,6 +30,7 @@ namespace kudu { class Arena; +class ColumnMaterializationContext; class RowBlock; class ScanSpec; @@ -92,12 +93,12 @@ class ColumnwiseIterator : public virtual IteratorBase { // is at the end of the available data. virtual Status PrepareBatch(size_t *nrows) = 0; - // Materialize the given column into the given column block. + // Materialize the given column into the given ctx's column block. // col_idx is within the projection schema, not the underlying schema. // // Any indirect data (eg strings) are copied into the destination block's // arena, if non-null. - virtual Status MaterializeColumn(size_t col_idx, ColumnBlock *dst) = 0; + virtual Status MaterializeColumn(ColumnMaterializationContext* ctx) = 0; // Finish the current batch. virtual Status FinishBatch() = 0; http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/common/rowblock.h ---------------------------------------------------------------------- diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h index c627ad9..1ce76fd 100644 --- a/src/kudu/common/rowblock.h +++ b/src/kudu/common/rowblock.h @@ -118,6 +118,40 @@ class SelectionVector { gscoped_array<uint8_t> bitmap_; }; +// A SelectionVectorView keeps track of where in the selection vector a given +// batch will start from. After processing a batch, Advance() should be called +// and the view will move forward by the appropriate amount. In this way, the +// underlying selection vector can easily be updated batch-by-batch. +class SelectionVectorView { + public: + explicit SelectionVectorView(SelectionVector *sel_vec) + : sel_vec_(sel_vec), row_offset_(0) { + } + void Advance(size_t skip) { + DCHECK_LE(skip, sel_vec_->nrows() - row_offset_); + row_offset_ += skip; + } + void SetBit(size_t row_idx) { + DCHECK_LE(row_idx, sel_vec_->nrows() - row_offset_); + BitmapSet(sel_vec_->mutable_bitmap(), row_offset_ + row_idx); + } + void ClearBit(size_t row_idx) { + DCHECK_LE(row_idx, sel_vec_->nrows() - row_offset_); + BitmapClear(sel_vec_->mutable_bitmap(), row_offset_ + row_idx); + } + bool TestBit(size_t row_idx) { + DCHECK_LE(row_idx, sel_vec_->nrows() - row_offset_); + return BitmapTest(sel_vec_->bitmap(), row_offset_ + row_idx); + } + void ClearBits(size_t nrows) { + DCHECK_LE(nrows, sel_vec_->nrows() - row_offset_); + BitmapChangeBits(sel_vec_->mutable_bitmap(), row_offset_, nrows, false); + } + private: + SelectionVector* sel_vec_; + size_t row_offset_; +}; + // A block of decoded rows. // Wrapper around a buffer, which keeps the buffer's size, associated arena, // and schema. Provides convenience accessors for indexing by row, column, etc. http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/common/schema.h ---------------------------------------------------------------------- diff --git a/src/kudu/common/schema.h b/src/kudu/common/schema.h index bebaff5..b8656e9 100644 --- a/src/kudu/common/schema.h +++ b/src/kudu/common/schema.h @@ -88,6 +88,12 @@ struct ColumnStorageAttributes { cfile_block_size(0) { } + ColumnStorageAttributes(EncodingType enc, CompressionType cmp) + : encoding(enc), + compression(cmp), + cfile_block_size(0) { + } + string ToString() const; EncodingType encoding; http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/CMakeLists.txt b/src/kudu/tablet/CMakeLists.txt index 402b2c8..9e8a00f 100644 --- a/src/kudu/tablet/CMakeLists.txt +++ b/src/kudu/tablet/CMakeLists.txt @@ -103,6 +103,7 @@ ADD_KUDU_TEST(deltafile-test) ADD_KUDU_TEST(cfile_set-test) ADD_KUDU_TEST(tablet-pushdown-test) ADD_KUDU_TEST(tablet-schema-test) +ADD_KUDU_TEST(tablet-decoder-eval-test) ADD_KUDU_TEST(tablet_bootstrap-test) ADD_KUDU_TEST(metadata-test) ADD_KUDU_TEST(mvcc-test) http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/cfile_set-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/cfile_set-test.cc b/src/kudu/tablet/cfile_set-test.cc index 7ecba71..c66593b 100644 --- a/src/kudu/tablet/cfile_set-test.cc +++ b/src/kudu/tablet/cfile_set-test.cc @@ -104,6 +104,14 @@ class TestCFileSet : public KuduRowSetTest { } } + Status MaterializeColumn(CFileSet::Iterator *iter, + size_t col_idx, + ColumnBlock *cb) { + SelectionVector sel(cb->nrows()); + ColumnMaterializationContext ctx(col_idx, nullptr, cb, &sel); + return iter->MaterializeColumn(&ctx); + } + private: ColumnStorageAttributes GetRLEStorage() const { ColumnStorageAttributes attr; @@ -148,7 +156,7 @@ TEST_F(TestCFileSet, TestPartiallyMaterialize) { int cycle = (row_idx / kCycleInterval) % 3; if (cycle == 0 || cycle == 2) { ColumnBlock col(block.column_block(0)); - ASSERT_OK_FAST(iter->MaterializeColumn(0, &col)); + ASSERT_OK_FAST(MaterializeColumn(iter.get(), 0, &col)); // Verify for (int i = 0; i < n; i++) { @@ -162,7 +170,7 @@ TEST_F(TestCFileSet, TestPartiallyMaterialize) { } if (cycle == 1 || cycle == 2) { ColumnBlock col(block.column_block(1)); - ASSERT_OK_FAST(iter->MaterializeColumn(1, &col)); + ASSERT_OK_FAST(MaterializeColumn(iter.get(), 1, &col)); // Verify for (int i = 0; i < n; i++) { http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/cfile_set.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/cfile_set.cc b/src/kudu/tablet/cfile_set.cc index b65d20d..0719da6 100644 --- a/src/kudu/tablet/cfile_set.cc +++ b/src/kudu/tablet/cfile_set.cc @@ -23,6 +23,7 @@ #include "kudu/cfile/cfile_util.h" #include "kudu/cfile/cfile_writer.h" #include "kudu/common/scan_spec.h" +#include "kudu/common/column_materialization_context.h" #include "kudu/gutil/dynamic_annotations.h" #include "kudu/gutil/map-util.h" #include "kudu/gutil/stl_util.h" @@ -30,6 +31,7 @@ #include "kudu/tablet/diskrowset.h" #include "kudu/tablet/cfile_set.h" #include "kudu/util/flag_tags.h" +#include "kudu/util/logging.h" DEFINE_bool(consult_bloom_filters, true, "Whether to consult bloom filters on row presence checks"); TAG_FLAG(consult_bloom_filters, hidden); @@ -400,14 +402,13 @@ Status CFileSet::Iterator::PrepareBatch(size_t *n) { return Status::OK(); } - -Status CFileSet::Iterator::PrepareColumn(size_t idx) { - if (cols_prepared_[idx]) { +Status CFileSet::Iterator::PrepareColumn(ColumnMaterializationContext *ctx) { + if (cols_prepared_[ctx->col_idx()]) { // Already prepared in this batch. return Status::OK(); } - ColumnIterator* col_iter = col_iters_[idx]; + ColumnIterator* col_iter = col_iters_[ctx->col_idx()]; size_t n = prepared_count_; if (!col_iter->seeked() || col_iter->GetCurrentOrdinal() != cur_idx_) { @@ -421,18 +422,19 @@ Status CFileSet::Iterator::PrepareColumn(size_t idx) { Status s = col_iter->PrepareBatch(&n); if (!s.ok()) { - LOG(WARNING) << "Unable to prepare column " << idx << ": " << s.ToString(); + LOG(WARNING) << "Unable to prepare column " << ctx->col_idx() << ": " << s.ToString(); return s; } if (n != prepared_count_) { return Status::Corruption( - StringPrintf("Column %zd (%s) didn't yield enough rows at offset %zd: expected " - "%zd but only got %zd", idx, projection_->column(idx).ToString().c_str(), - cur_idx_, prepared_count_, n)); + StringPrintf("Column %zd (%s) didn't yield enough rows at offset %zd: expected " + "%zd but only got %zd", ctx->col_idx(), + projection_->column(ctx->col_idx()).ToString().c_str(), + cur_idx_, prepared_count_, n)); } - cols_prepared_[idx] = true; + cols_prepared_[ctx->col_idx()] = true; return Status::OK(); } @@ -442,13 +444,16 @@ Status CFileSet::Iterator::InitializeSelectionVector(SelectionVector *sel_vec) { return Status::OK(); } -Status CFileSet::Iterator::MaterializeColumn(size_t col_idx, ColumnBlock *dst) { - CHECK_EQ(prepared_count_, dst->nrows()); - DCHECK_LT(col_idx, col_iters_.size()); +Status CFileSet::Iterator::MaterializeColumn(ColumnMaterializationContext *ctx) { + CHECK_EQ(prepared_count_, ctx->block()->nrows()); + DCHECK_LT(ctx->col_idx(), col_iters_.size()); + + RETURN_NOT_OK(PrepareColumn(ctx)); + ColumnIterator* iter = col_iters_[ctx->col_idx()]; - RETURN_NOT_OK(PrepareColumn(col_idx)); - ColumnIterator* iter = col_iters_[col_idx]; - return iter->Scan(dst); + RETURN_NOT_OK(iter->Scan(ctx)); + + return Status::OK(); } Status CFileSet::Iterator::FinishBatch() { http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/cfile_set.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/cfile_set.h b/src/kudu/tablet/cfile_set.h index 9979069..7d46cf5 100644 --- a/src/kudu/tablet/cfile_set.h +++ b/src/kudu/tablet/cfile_set.h @@ -144,7 +144,7 @@ class CFileSet::Iterator : public ColumnwiseIterator { virtual Status InitializeSelectionVector(SelectionVector *sel_vec) OVERRIDE; - virtual Status MaterializeColumn(size_t col_idx, ColumnBlock *dst) OVERRIDE; + Status MaterializeColumn(ColumnMaterializationContext *ctx) override; virtual Status FinishBatch() OVERRIDE; @@ -197,7 +197,7 @@ class CFileSet::Iterator : public ColumnwiseIterator { void Unprepare(); // Prepare the given column if not already prepared. - Status PrepareColumn(size_t col_idx); + Status PrepareColumn(ColumnMaterializationContext *ctx); const std::shared_ptr<CFileSet const> base_data_; const Schema* projection_; http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/delta_applier.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/delta_applier.cc b/src/kudu/tablet/delta_applier.cc index 35afef7..a968122 100644 --- a/src/kudu/tablet/delta_applier.cc +++ b/src/kudu/tablet/delta_applier.cc @@ -20,6 +20,7 @@ #include <string> #include <vector> +#include "kudu/common/column_materialization_context.h" #include "kudu/common/iterator.h" #include "kudu/tablet/delta_store.h" #include "kudu/util/status.h" @@ -93,14 +94,17 @@ Status DeltaApplier::InitializeSelectionVector(SelectionVector *sel_vec) { return delta_iter_->ApplyDeletes(sel_vec); } -Status DeltaApplier::MaterializeColumn(size_t col_idx, ColumnBlock *dst) { +Status DeltaApplier::MaterializeColumn(ColumnMaterializationContext *ctx) { DCHECK(!first_prepare_) << "PrepareBatch() must be called at least once"; + // Data with updates cannot be evaluated at the decoder-level. + if (delta_iter_->MayHaveDeltas()) { + ctx->SetDecoderEvalNotSupported(); + RETURN_NOT_OK(base_iter_->MaterializeColumn(ctx)); + RETURN_NOT_OK(delta_iter_->ApplyUpdates(ctx->col_idx(), ctx->block())); + } else { + RETURN_NOT_OK(base_iter_->MaterializeColumn(ctx)); + } - // Copy the base data. - RETURN_NOT_OK(base_iter_->MaterializeColumn(col_idx, dst)); - - // Apply all the updates for this column. - RETURN_NOT_OK(delta_iter_->ApplyUpdates(col_idx, dst)); return Status::OK(); } http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/delta_applier.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/delta_applier.h b/src/kudu/tablet/delta_applier.h index b5bcca3..1f2cb01 100644 --- a/src/kudu/tablet/delta_applier.h +++ b/src/kudu/tablet/delta_applier.h @@ -61,7 +61,7 @@ class DeltaApplier : public ColumnwiseIterator { // All other rows are set to 1. virtual Status InitializeSelectionVector(SelectionVector *sel_vec) OVERRIDE; - Status MaterializeColumn(size_t col_idx, ColumnBlock *dst) OVERRIDE; + Status MaterializeColumn(ColumnMaterializationContext *ctx) override; private: friend class DeltaTracker; http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/delta_iterator_merger.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/delta_iterator_merger.cc b/src/kudu/tablet/delta_iterator_merger.cc index 582917f..d19aa62 100644 --- a/src/kudu/tablet/delta_iterator_merger.cc +++ b/src/kudu/tablet/delta_iterator_merger.cc @@ -111,6 +111,15 @@ bool DeltaIteratorMerger::HasNext() { return false; } +bool DeltaIteratorMerger::MayHaveDeltas() { + for (const unique_ptr<DeltaIterator>& iter : iters_) { + if (iter->MayHaveDeltas()) { + return true; + } + } + return false; +} + string DeltaIteratorMerger::ToString() const { string ret; ret.append("DeltaIteratorMerger("); http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/delta_iterator_merger.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/delta_iterator_merger.h b/src/kudu/tablet/delta_iterator_merger.h index be7f746..f1e5cef 100644 --- a/src/kudu/tablet/delta_iterator_merger.h +++ b/src/kudu/tablet/delta_iterator_merger.h @@ -57,6 +57,7 @@ class DeltaIteratorMerger : public DeltaIterator { vector<DeltaKeyAndUpdate>* out, Arena* arena) OVERRIDE; virtual bool HasNext() OVERRIDE; + bool MayHaveDeltas() override; virtual std::string ToString() const OVERRIDE; private: http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/delta_store.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h index b0ab4b2..64dae2c 100644 --- a/src/kudu/tablet/delta_store.h +++ b/src/kudu/tablet/delta_store.h @@ -173,6 +173,12 @@ class DeltaIterator { // Returns true if there are any more rows left in this iterator. virtual bool HasNext() = 0; + // Returns true if there might exist deltas to be applied. It is safe to + // conservatively return true, but this would force a skip over decoder-level + // evaluation. + // Must have called PrepareBatch() with flag = PREPARE_FOR_APPLY. + virtual bool MayHaveDeltas() = 0; + // Return a string representation suitable for debug printouts. virtual std::string ToString() const = 0; http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/deltafile.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc index e86f970..08a8ffa 100644 --- a/src/kudu/tablet/deltafile.cc +++ b/src/kudu/tablet/deltafile.cc @@ -817,6 +817,22 @@ bool DeltaFileIterator::HasNext() { return !exhausted_ || !delta_blocks_.empty(); } +bool DeltaFileIterator::MayHaveDeltas() { + // TODO: change the API to take in the col_to_apply and check for deltas on + // that column only. + DCHECK(prepared_) << "must Prepare"; + for (auto& block : delta_blocks_) { + BinaryPlainBlockDecoder& bpd = *block->decoder_; + if (PREDICT_FALSE(prepared_idx_ > block->last_updated_idx_)) { + continue; + } + if (block->prepared_block_start_idx_ < bpd.Count()) { + return true; + } + } + return false; +} + string DeltaFileIterator::ToString() const { return "DeltaFileIterator(" + dfr_->ToString() + ")"; } http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/deltafile.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h index cdfdcd7..a7091e0 100644 --- a/src/kudu/tablet/deltafile.h +++ b/src/kudu/tablet/deltafile.h @@ -202,6 +202,7 @@ class DeltaFileIterator : public DeltaIterator { Arena* arena) OVERRIDE; string ToString() const OVERRIDE; virtual bool HasNext() OVERRIDE; + bool MayHaveDeltas() override; private: friend class DeltaFileReader; http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/deltamemstore.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc index 3db3f26..7653867 100644 --- a/src/kudu/tablet/deltamemstore.cc +++ b/src/kudu/tablet/deltamemstore.cc @@ -363,6 +363,18 @@ bool DMSIterator::HasNext() { return false; } +bool DMSIterator::MayHaveDeltas() { + if (!deletes_and_reinserts_.empty()) { + return true; + } + for (auto& col: updates_by_col_) { + if (!col.empty()) { + return true; + } + } + return false; +} + string DMSIterator::ToString() const { return "DMSIterator"; } http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/deltamemstore.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h index 5228efe..a9bdf51 100644 --- a/src/kudu/tablet/deltamemstore.h +++ b/src/kudu/tablet/deltamemstore.h @@ -203,6 +203,8 @@ class DMSIterator : public DeltaIterator { virtual bool HasNext() OVERRIDE; + bool MayHaveDeltas() override; + private: DISALLOW_COPY_AND_ASSIGN(DMSIterator); FRIEND_TEST(TestDeltaMemStore, TestIteratorDoesUpdates); http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/tablet-decoder-eval-test.cc ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet-decoder-eval-test.cc b/src/kudu/tablet/tablet-decoder-eval-test.cc new file mode 100644 index 0000000..e35f416 --- /dev/null +++ b/src/kudu/tablet/tablet-decoder-eval-test.cc @@ -0,0 +1,302 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <glog/logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/strings/substitute.h" +#include <kudu/util/flags.h> + +#include "kudu/common/schema.h" +#include "kudu/tablet/tablet-test-base.h" + +namespace kudu { +namespace tablet { + +using strings::Substitute; + +enum Setup { +#ifdef ADDRESS_SANITIZER + SMALL = 100, MEDIUM = 3000, LARGE = 10000 +#else + SMALL = 100, MEDIUM = 5000, LARGE = 100000 +#endif +}; + +DEFINE_int32(decoder_eval_test_nrepeats, 1, "Number of times to repeat per tablet"); +DEFINE_int32(decoder_eval_test_lower, 0, "Lower bound on the predicate [lower, upper)"); +DEFINE_int32(decoder_eval_test_upper, 50, "Upper bound on the predicate [lower, upper)"); +DEFINE_int32(decoder_eval_test_strlen, 10, "Number of strings per cell"); + +class TabletDecoderEvalTest : public KuduTabletTest, + public ::testing::WithParamInterface<Setup> { +public: + TabletDecoderEvalTest() + : KuduTabletTest(Schema({ColumnSchema("key", INT32), + ColumnSchema("string_val_a", STRING, true, NULL, NULL, + ColumnStorageAttributes(DICT_ENCODING, + DEFAULT_COMPRESSION)), + ColumnSchema("string_val_b", STRING, true, NULL, NULL, + ColumnStorageAttributes(DICT_ENCODING, + DEFAULT_COMPRESSION))}, 1)) + {} + + void SetUp() override { + KuduTabletTest::SetUp(); + } + + void ScanAndFilter(size_t cardinality, size_t lower, size_t upper, int null_upper) { + if (GetParam() == LARGE && !AllowSlowTests()) { + LOG(INFO) << "Skipped large test case"; + return; + } + size_t nrows = static_cast<size_t>(GetParam()); + // The correctness check of this test requires that the int and string + // comparators for the values in the tablets match up. Adjust the lengths + // of the strings to enforce this. + // e.g. scanning ["00", ..., "99"] for > "111" would return numerically + // incorrect values, but ["000", ..., "099"] would return correct values. + size_t strlen = std::max({static_cast<size_t>(FLAGS_decoder_eval_test_strlen), + Substitute("$0", upper).length(), + Substitute("$0", cardinality).length()}); + + // Fill tablet, a negative null_upper will yield no NULLs. + FillTestTablet(nrows, cardinality, strlen, null_upper); + int fetched = 0; + size_t lower_not_null = lower; + if (null_upper > static_cast<int>(lower)) { + // If null_upper is greater than the lower bound, the expected results + // will be calculated with null_upper as the lower bound. + // e.g. null_upper: 3, lower: 2, upper: 6 + // NULL, NULL, NULL, "3", "4", "5", NULL, NULL, NULL, "3", "4", "5", ... + // Expected result will be calculated as if the query were [3, 6). + lower_not_null = null_upper; + } + + for (int i = 0; i < FLAGS_decoder_eval_test_nrepeats; i++) { + TestTimedScanWithBounds(nrows, cardinality, strlen, lower, upper, &fetched); + + // Calculate the expected count, potentially factoring in nulls. + size_t expected_sel_count = ExpectedCount(nrows, cardinality, lower_not_null, upper); + ASSERT_EQ(expected_sel_count, fetched); + LOG(INFO) << "Nrows: " << nrows + << ", Strlen: " << strlen + << ", Expected: " << expected_sel_count + << ", Actual: " << fetched; + } + } + + void TestScanAndFilter(size_t cardinality, size_t lower, size_t upper) { + ScanAndFilter(cardinality, lower, upper, -1); + } + + void TestNullableScanAndFilter(size_t cardinality, size_t lower, size_t upper, int null_upper) { + ScanAndFilter(cardinality, lower, upper, null_upper); + } + + void FillTestTablet(size_t nrows, size_t cardinality, size_t strlen, int null_upper) { + RowBuilder rb(client_schema_); + LocalTabletWriter writer(tablet().get(), &client_schema_); + KuduPartialRow row(&client_schema_); + for (int64_t i = 0; i < nrows; i++) { + CHECK_OK(row.SetInt32(0, i)); + + // Populate the bottom of the repeating pattern with NULLs. + // Note: Negative null_upper will yield a completely non-NULL column. + if (static_cast<int>(i % cardinality) < null_upper) { + CHECK_OK(row.SetNull(1)); + CHECK_OK(row.SetNull(2)); + } else { + CHECK_OK(row.SetStringCopy(1, LeftZeroPadded(i % cardinality, strlen))); + CHECK_OK(row.SetStringCopy(2, LeftZeroPadded(i % cardinality, strlen))); + } + ASSERT_OK_FAST(writer.Insert(row)); + } + ASSERT_OK(tablet()->Flush()); + } + + void TestTimedScanWithBounds(size_t nrows, size_t cardinality, size_t strlen, size_t lower_val, + size_t upper_val, int* fetched) { + Arena arena(128, 1028); + AutoReleasePool pool; + ScanSpec spec; + + // Generate the predicate. + const std::string lower_string = LeftZeroPadded(lower_val, strlen); + const std::string upper_string = LeftZeroPadded(upper_val, strlen); + Slice lower(lower_string); + Slice upper(upper_string); + auto string_pred = ColumnPredicate::Range(schema_.column(2), &lower, &upper); + + // Prepare the scan. + spec.AddPredicate(string_pred); + spec.OptimizeScan(schema_, &arena, &pool, true); + ScanSpec orig_spec = spec; + gscoped_ptr<RowwiseIterator> iter; + ASSERT_OK(tablet()->NewRowIterator(client_schema_, &iter)); + spec = orig_spec; + ASSERT_OK(iter->Init(&spec)); + ASSERT_TRUE(spec.predicates().empty()) << "Should have accepted all predicates"; + + // Execute and time the scan. Argument fetched is an output and will be set + // to the number of rows returned in the result set. + LOG_TIMING(INFO, "Filtering by string value") { + ASSERT_OK(SilentIterateToStringList(iter.get(), fetched)); + } + } + + size_t ExpectedCount(size_t nrows, size_t cardinality, size_t lower, size_t upper) { + if (lower >= upper || lower >= cardinality) { + return 0; + } + lower = std::max(static_cast<size_t>(0), lower); + upper = std::min(cardinality, upper); + size_t last_chunk_count = 0; + size_t last_chunk_size = nrows % cardinality; + + if (last_chunk_size == 0 || last_chunk_size <= lower) { + // e.g. lower: 3, upper: 8, cardinality:10, nrows: 23, last_chunk_size: 3 + // Resulting vector: [0001111100|0001111100|000] + last_chunk_count = 0; + } else if (last_chunk_size <= upper) { + // e.g. lower: 3, upper: 8, cardinality:10, nrows: 25, last_chunk_size: 5 + // Resulting vector: [0001111100|0001111100|00011] + last_chunk_count = last_chunk_size - lower; + } else { + // e.g. lower: 3, upper: 8, cardinality:10, nrows: 29, last_chunk_size: 9 + // Resulting vector: [0001111100|0001111100|000111110] + last_chunk_count = upper - lower; + } + return (nrows / cardinality) * (upper - lower) + last_chunk_count; + } + + std::string LeftZeroPadded(size_t n, size_t strlen) { + // Assumes the string representation of n is under strlen characters. + return StringPrintf(Substitute("%0$0$1", strlen, PRId64).c_str(), static_cast<int64_t>(n)); + } + + void TestMultipleColumnPredicates(size_t cardinality, size_t lower, size_t upper) { + if (GetParam() == LARGE && !AllowSlowTests()) { + LOG(INFO) << "Skipped large test case"; + return; + } + size_t nrows = static_cast<size_t>(GetParam()); + size_t strlen = std::max({static_cast<size_t>(FLAGS_decoder_eval_test_strlen), + Substitute("$0", upper).length(), + Substitute("$0", cardinality).length()}); + FillTestTablet(nrows, 10, strlen, -1); + Arena arena(128, 1028); + AutoReleasePool pool; + ScanSpec spec; + + // Generate the predicates [0, upper) AND [lower, cardinality). + const std::string lower_string_a(LeftZeroPadded(0, strlen)); + const std::string upper_string_a(LeftZeroPadded(upper, strlen)); + Slice lower_a(lower_string_a); + Slice upper_a(upper_string_a); + const std::string lower_string_b = LeftZeroPadded(lower, strlen); + const std::string upper_string_b = LeftZeroPadded(cardinality, strlen); + Slice lower_b(lower_string_b); + Slice upper_b(upper_string_b); + + // This will exercise CopyNextAndEval's skipping behavior in the decoders + // that support evaluation. Decoders should skip over rows that have been + // deemed to not be returned by a prior column evaluation. + auto string_pred_a = ColumnPredicate::Range(schema_.column(1), &lower_a, &upper_a); + auto string_pred_b = ColumnPredicate::Range(schema_.column(2), &lower_b, &upper_b); + + // Prepare the scan. + spec.AddPredicate(string_pred_a); + spec.AddPredicate(string_pred_b); + spec.OptimizeScan(schema_, &arena, &pool, true); + ScanSpec orig_spec = spec; + gscoped_ptr<RowwiseIterator> iter; + ASSERT_OK(tablet()->NewRowIterator(client_schema_, &iter)); + spec = orig_spec; + ASSERT_OK(iter->Init(&spec)); + ASSERT_TRUE(spec.predicates().empty()) << "Should have accepted all predicates"; + + Arena ret_arena(1028, 1028); + size_t expected_count = ExpectedCount(nrows, cardinality, lower, upper); + Schema schema = iter->schema(); + RowBlock block(schema, 100, &ret_arena); + int fetched = 0; + std::string column_str_a, column_str_b; + while (iter->HasNext()) { + ASSERT_OK(iter->NextBlock(&block)); + for (size_t i = 0; i < block.nrows(); i++) { + if (block.selection_vector()->IsRowSelected(i)) { + column_str_a = schema.column(1).Stringify(block.row(i).cell(1).ptr()); + column_str_b = schema.column(2).Stringify(block.row(i).cell(2).ptr()); + // Correct skipping should yield matching strings between columns. + ASSERT_TRUE(std::strcmp(column_str_a.c_str(), column_str_b.c_str()) == 0); + fetched++; + } + } + } + ASSERT_EQ(fetched, expected_count); + } +}; + +TEST_P(TabletDecoderEvalTest, LowCardinality) { + TestScanAndFilter(50, FLAGS_decoder_eval_test_lower, FLAGS_decoder_eval_test_upper); +} + +TEST_P(TabletDecoderEvalTest, MidCardinality) { + TestScanAndFilter(1000, FLAGS_decoder_eval_test_lower, FLAGS_decoder_eval_test_upper); +} + +TEST_P(TabletDecoderEvalTest, HighCardinality) { + TestScanAndFilter(50000, FLAGS_decoder_eval_test_lower, FLAGS_decoder_eval_test_upper); +} + +TEST_P(TabletDecoderEvalTest, EvaluateEmpty) { + // Predicate [k, k+5) will not evaluate to None, but will return no rows. + TestScanAndFilter(50, 50, 55); +} + +TEST_P(TabletDecoderEvalTest, NullableLowCardinality) { + // Fill a tablet with pattern [0, 50) but with values [0, 40) as NULL. + // Query for values [30, 50). + TestNullableScanAndFilter(50, 30, 50, 40); +} + +TEST_P(TabletDecoderEvalTest, NullableMidCardinality) { + // Fill a tablet with pattern [0, 1000) but with values [0, 50) as NULL. + // Query for values [30, 100). + TestNullableScanAndFilter(1000, 30, 100, 50); +} + +TEST_P(TabletDecoderEvalTest, NullableHighCardinality) { + // Fill a tablet with pattern [0, 50000) but with values [0, 75) as NULL. + // Query for values [30, 200). + TestNullableScanAndFilter(50000, 30, 200, 75); +} + +TEST_P(TabletDecoderEvalTest, MultipleColumns) { + // Fill a tablet with pattern [0, 10) and query a:[0, 5) AND b:[3, 10). + // To be considered correct, returned columns must align as they do in the + // table and the correct number of rows must be returned. + TestMultipleColumnPredicates(10, 3, 5); +} + +INSTANTIATE_TEST_CASE_P(DecoderEvaluation, TabletDecoderEvalTest, ::testing::Values(SMALL, + MEDIUM, + LARGE)); + +} // namespace tablet +} // namespace kudu http://git-wip-us.apache.org/repos/asf/kudu/blob/c0f37278/src/kudu/tablet/tablet-test-util.h ---------------------------------------------------------------------- diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h index 1b25538..7cc8516 100644 --- a/src/kudu/tablet/tablet-test-util.h +++ b/src/kudu/tablet/tablet-test-util.h @@ -139,8 +139,27 @@ class KuduRowSetTest : public KuduTabletTest { std::shared_ptr<RowSetMetadata> rowset_meta_; }; -static inline Status IterateToStringList(RowwiseIterator *iter, - vector<string> *out, +// Iterate through the values without outputting them at the end +// This is strictly a measure of decoding and evaluating predicates +static inline Status SilentIterateToStringList(RowwiseIterator* iter, + int* fetched) { + const Schema& schema = iter->schema(); + Arena arena(1024, 1024); + RowBlock block(schema, 100, &arena); + *fetched = 0; + while (iter->HasNext()) { + RETURN_NOT_OK(iter->NextBlock(&block)); + for (size_t i = 0; i < block.nrows(); i++) { + if (block.selection_vector()->IsRowSelected(i)) { + (*fetched)++; + } + } + } + return Status::OK(); +} + +static inline Status IterateToStringList(RowwiseIterator* iter, + vector<string>* out, int limit = INT_MAX) { out->clear(); Schema schema = iter->schema();
