This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push: new cbf00628c ORC-969: [C++] Evaluate SearchArguments using file and stripe level stats #1073 cbf00628c is described below commit cbf00628c4d5ec8287eae419edf7e5dede4d07cf Author: mingshen.zx <mingshen...@alibaba-inc.com> AuthorDate: Tue Mar 29 15:17:20 2022 +0800 ORC-969: [C++] Evaluate SearchArguments using file and stripe level stats #1073 PPD use file stats and stripe stats to filter file content. To make better use of file and stripe stats. Use the UT testStripeAndFileStats test. Closes #1073 Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- c++/src/Reader.cc | 59 ++++++++++++++++++--------- c++/src/Reader.hh | 2 +- c++/src/sargs/SargsApplier.cc | 53 +++++++++++++++++++++++- c++/src/sargs/SargsApplier.hh | 20 +++++++++ c++/test/TestPredicatePushdown.cc | 85 +++++++++++++++++++++++++++++++++++++++ c++/test/TestSargsApplier.cc | 71 ++++++++++++++++++++++++++++++++ 6 files changed, 270 insertions(+), 20 deletions(-) diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc index 34cc95052..b79f2132c 100644 --- a/c++/src/Reader.cc +++ b/c++/src/Reader.cc @@ -591,8 +591,8 @@ namespace orc { if (!isMetadataLoaded) { readMetadata(); } - return metadata.get() == nullptr ? 0 : - static_cast<uint64_t>(metadata->stripestats_size()); + return contents->metadata == nullptr ? 0 : + static_cast<uint64_t>(contents->metadata->stripestats_size()); } std::unique_ptr<StripeInformation> @@ -767,11 +767,11 @@ namespace orc { if (!isMetadataLoaded) { readMetadata(); } - if (metadata.get() == nullptr) { + if (contents->metadata == nullptr) { throw std::logic_error("No stripe statistics in file"); } size_t num_cols = static_cast<size_t>( - metadata->stripestats( + contents->metadata->stripestats( static_cast<int>(stripeIndex)).colstats_size()); std::vector<std::vector<proto::ColumnStatistics> > indexStats(num_cols); @@ -788,7 +788,7 @@ namespace orc { getLocalTimezone(); StatContext statContext(hasCorrectStatistics(), &writerTZ); return std::unique_ptr<StripeStatistics> - (new StripeStatisticsImpl(metadata->stripestats(static_cast<int>(stripeIndex)), + (new StripeStatisticsImpl(contents->metadata->stripestats(static_cast<int>(stripeIndex)), indexStats, statContext)); } @@ -831,8 +831,8 @@ namespace orc { *contents->pool)), contents->blockSize, *contents->pool); - metadata.reset(new proto::Metadata()); - if (!metadata->ParseFromZeroCopyStream(pbStream.get())) { + contents->metadata.reset(new proto::Metadata()); + if (!contents->metadata->ParseFromZeroCopyStream(pbStream.get())) { throw ParseError("Failed to parse the metadata"); } } @@ -860,6 +860,10 @@ namespace orc { std::unique_ptr<RowReader> ReaderImpl::createRowReader( const RowReaderOptions& opts) const { + if (opts.getSearchArgument() && !isMetadataLoaded) { + // load stripe statistics for PPD + readMetadata(); + } return std::unique_ptr<RowReader>(new RowReaderImpl(contents, opts)); } @@ -1034,6 +1038,15 @@ namespace orc { rowIndexes.clear(); bloomFilterIndex.clear(); + // evaluate file statistics if it exists + if (sargsApplier && !sargsApplier->evaluateFileStatistics(*footer)) { + // skip the entire file + currentStripe = lastStripe; + currentRowInStripe = 0; + rowsInCurrentStripe = 0; + return; + } + do { currentStripeInfo = footer->stripes(static_cast<int>(currentStripe)); uint64_t fileLength = contents->stream->getLength(); @@ -1050,16 +1063,26 @@ namespace orc { rowsInCurrentStripe = currentStripeInfo.numberofrows(); if (sargsApplier) { - // read row group statistics and bloom filters of current stripe - loadStripeIndex(); - - // select row groups to read in the current stripe - sargsApplier->pickRowGroups(rowsInCurrentStripe, - rowIndexes, - bloomFilterIndex); - if (sargsApplier->hasSelectedFrom(currentRowInStripe)) { - // current stripe has at least one row group matching the predicate - break; + bool isStripeNeeded = true; + if (contents->metadata) { + const auto& currentStripeStats = + contents->metadata->stripestats(static_cast<int>(currentStripe)); + // skip this stripe after stats fail to satisfy sargs + isStripeNeeded = sargsApplier->evaluateStripeStatistics(currentStripeStats); + } + + if (isStripeNeeded) { + // read row group statistics and bloom filters of current stripe + loadStripeIndex(); + + // select row groups to read in the current stripe + sargsApplier->pickRowGroups(rowsInCurrentStripe, + rowIndexes, + bloomFilterIndex); + if (sargsApplier->hasSelectedFrom(currentRowInStripe)) { + // current stripe has at least one row group matching the predicate + break; + } } else { // advance to next stripe when current stripe has no matching rows currentStripe += 1; @@ -1113,7 +1136,7 @@ namespace orc { uint64_t rowsToRead = std::min(static_cast<uint64_t>(data.capacity), rowsInCurrentStripe - currentRowInStripe); - if (sargsApplier) { + if (sargsApplier && rowsToRead > 0) { rowsToRead = computeBatchSize(rowsToRead, currentRowInStripe, rowsInCurrentStripe, diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh index 54dbe339b..0facafc44 100644 --- a/c++/src/Reader.hh +++ b/c++/src/Reader.hh @@ -66,6 +66,7 @@ namespace orc { /// Decimal64 in ORCv2 uses RLE to store values. This flag indicates whether /// this new encoding is used. bool isDecimalAsLong; + std::unique_ptr<proto::Metadata> metadata; }; proto::StripeFooter getStripeFooter(const proto::StripeInformation& info, @@ -254,7 +255,6 @@ namespace orc { std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const; // metadata - mutable std::unique_ptr<proto::Metadata> metadata; mutable bool isMetadataLoaded; public: /** diff --git a/c++/src/sargs/SargsApplier.cc b/c++/src/sargs/SargsApplier.cc index f99499e77..b4a736753 100644 --- a/c++/src/sargs/SargsApplier.cc +++ b/c++/src/sargs/SargsApplier.cc @@ -46,7 +46,9 @@ namespace orc { , mSearchArgument(searchArgument) , mRowIndexStride(rowIndexStride) , mWriterVersion(writerVersion) - , mStats(0, 0) { + , mStats(0, 0) + , mHasEvaluatedFileStats(false) + , mFileStatsEvalResult(true) { const SearchArgumentImpl * sargs = dynamic_cast<const SearchArgumentImpl *>(mSearchArgument); @@ -122,4 +124,53 @@ namespace orc { return mHasSelected; } + bool SargsApplier::evaluateColumnStatistics( + const PbColumnStatistics& colStats) const { + const SearchArgumentImpl * sargs = + dynamic_cast<const SearchArgumentImpl *>(mSearchArgument); + if (sargs == nullptr) { + throw InvalidArgument("Failed to cast to SearchArgumentImpl"); + } + + const std::vector<PredicateLeaf>& leaves = sargs->getLeaves(); + std::vector<TruthValue> leafValues( + leaves.size(), TruthValue::YES_NO_NULL); + + for (size_t pred = 0; pred != leaves.size(); ++pred) { + uint64_t columnId = mFilterColumns[pred]; + if (columnId != INVALID_COLUMN_ID && + colStats.size() > static_cast<int>(columnId)) { + leafValues[pred] = leaves[pred].evaluate( + mWriterVersion, colStats.Get(static_cast<int>(columnId)), nullptr); + } + } + + return isNeeded(mSearchArgument->evaluate(leafValues)); + } + + bool SargsApplier::evaluateStripeStatistics( + const proto::StripeStatistics& stripeStats) { + if (stripeStats.colstats_size() == 0) { + return true; + } + + bool ret = evaluateColumnStatistics(stripeStats.colstats()); + if (!ret) { + // reset mRowGroups when the current stripe does not satisfy the PPD + mRowGroups.clear(); + } + return ret; + } + + bool SargsApplier::evaluateFileStatistics(const proto::Footer& footer) { + if (!mHasEvaluatedFileStats) { + if (footer.statistics_size() == 0) { + mFileStatsEvalResult = true; + } else { + mFileStatsEvalResult = evaluateColumnStatistics(footer.statistics()); + } + mHasEvaluatedFileStats = true; + } + return mFileStatsEvalResult; + } } diff --git a/c++/src/sargs/SargsApplier.hh b/c++/src/sargs/SargsApplier.hh index 39650f1fa..1842828d5 100644 --- a/c++/src/sargs/SargsApplier.hh +++ b/c++/src/sargs/SargsApplier.hh @@ -37,6 +37,18 @@ namespace orc { uint64_t rowIndexStride, WriterVersion writerVersion); + /** + * Evaluate search argument on file statistics + * @return true if file statistics satisfy the sargs + */ + bool evaluateFileStatistics(const proto::Footer& footer); + + /** + * Evaluate search argument on stripe statistics + * @return true if stripe statistics satisfy the sargs + */ + bool evaluateStripeStatistics(const proto::StripeStatistics& stripeStats); + /** * TODO: use proto::RowIndex and proto::BloomFilter to do the evaluation * Pick the row groups that we need to load from the current stripe. @@ -81,6 +93,11 @@ namespace orc { } private: + // evaluate column statistics in the form of protobuf::RepeatedPtrField + typedef ::google::protobuf::RepeatedPtrField<proto::ColumnStatistics> + PbColumnStatistics; + bool evaluateColumnStatistics(const PbColumnStatistics& colStats) const; + friend class TestSargsApplier_findColumnTest_Test; friend class TestSargsApplier_findArrayColumnTest_Test; friend class TestSargsApplier_findMapColumnTest_Test; @@ -101,6 +118,9 @@ namespace orc { bool mHasSkipped; // keep stats of selected RGs and evaluated RGs std::pair<uint64_t, uint64_t> mStats; + // store result of file stats evaluation + bool mHasEvaluatedFileStats; + bool mFileStatsEvalResult; }; } diff --git a/c++/test/TestPredicatePushdown.cc b/c++/test/TestPredicatePushdown.cc index 41d0b532f..6bd81f091 100644 --- a/c++/test/TestPredicatePushdown.cc +++ b/c++/test/TestPredicatePushdown.cc @@ -238,4 +238,89 @@ namespace orc { TestNoRowsSelected(reader.get()); TestOrPredicates(reader.get()); } + + void TestNoRowsSelectedWithFileStats(Reader* reader) { + std::unique_ptr<SearchArgument> sarg = + SearchArgumentFactory::newBuilder() + ->startAnd() + .lessThan("col1", PredicateDataType::LONG, + Literal(static_cast<int64_t>(0))) + .end() + .build(); + + RowReaderOptions rowReaderOpts; + rowReaderOpts.searchArgument(std::move(sarg)); + auto rowReader = reader->createRowReader(rowReaderOpts); + + auto readBatch = rowReader->createRowBatch(2000); + EXPECT_EQ(false, rowReader->next(*readBatch)); + } + + void TestSelectedWithStripeStats(Reader* reader) { + std::unique_ptr<SearchArgument> sarg = + SearchArgumentFactory::newBuilder() + ->startAnd() + .between("col1", + PredicateDataType::LONG, + Literal(static_cast<int64_t>(3500)), + Literal(static_cast<int64_t>(7000))) + .end() + .build(); + + RowReaderOptions rowReaderOpts; + rowReaderOpts.searchArgument(std::move(sarg)); + auto rowReader = reader->createRowReader(rowReaderOpts); + + auto readBatch = rowReader->createRowBatch(2000); + EXPECT_EQ(true, rowReader->next(*readBatch)); + // test previous row number + EXPECT_EQ(3500, rowReader->getRowNumber()); + EXPECT_EQ(2000, readBatch->numElements); + auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch); + auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]); + for (uint64_t i = 0; i < 2000; ++i) { + EXPECT_EQ(i + 3500 , batch1.data[i]); + } + } + + TEST(TestPredicatePushdown, testStripeAndFileStats) { + MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE); + MemoryPool * pool = getDefaultPool(); + auto type = std::unique_ptr<Type>(Type::buildTypeFromString( + "struct<col1:bigint>")); + WriterOptions options; + options.setStripeSize(1) + .setCompressionBlockSize(1024) + .setCompression(CompressionKind_NONE) + .setMemoryPool(pool) + .setRowIndexStride(1000); + + auto writer = createWriter(*type, &memStream, options); + auto batch = writer->createRowBatch(3500); + auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch); + auto& longBatch = dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]); + + // stripe 1 : 0 <= col1 < 3500 + // stripe 2 : 3500<= col1 < 7000 + uint64_t stripeCount = 2; + for (uint64_t currentStripe = 0; currentStripe < stripeCount; ++currentStripe) { + for (uint64_t i = 0; i < 3500; ++i) { + longBatch.data[i] = static_cast<int64_t>(i + currentStripe * 3500); + } + structBatch.numElements = 3500; + longBatch.numElements = 3500; + writer->add(*batch); + } + writer->close(); + std::unique_ptr<InputStream> inStream(new MemoryInputStream ( + memStream.getData(), memStream.getLength())); + ReaderOptions readerOptions; + readerOptions.setMemoryPool(*pool); + std::unique_ptr<Reader> reader = createReader(std::move(inStream), readerOptions); + EXPECT_EQ(7000, reader->getNumberOfRows()); + EXPECT_EQ(stripeCount, reader->getNumberOfStripes()); + + TestNoRowsSelectedWithFileStats(reader.get()); + TestSelectedWithStripeStats(reader.get()); + } } // namespace orc diff --git a/c++/test/TestSargsApplier.cc b/c++/test/TestSargsApplier.cc index 151635897..2ec8c3cb8 100644 --- a/c++/test/TestSargsApplier.cc +++ b/c++/test/TestSargsApplier.cc @@ -120,4 +120,75 @@ namespace orc { EXPECT_EQ(true, rowgroups[3]); } + TEST(TestSargsApplier, testStripeAndFileStats) { + auto type = std::unique_ptr<Type>( + Type::buildTypeFromString("struct<x:int,y:int>")); + auto sarg = SearchArgumentFactory::newBuilder() + ->startAnd() + .equals( + "x", + PredicateDataType::LONG, + Literal(static_cast<int64_t>(20))) + .equals( + "y", + PredicateDataType::LONG, + Literal(static_cast<int64_t>(40))) + .end() + .build(); + // Test stripe stats 0 <= x <= 10 and 0 <= y <= 50 + { + orc::proto::StripeStatistics stripeStats; + proto::ColumnStatistics structStatistics; + structStatistics.set_hasnull(false); + *stripeStats.add_colstats() = structStatistics; + *stripeStats.add_colstats() = createIntStats(0L, 10L); + *stripeStats.add_colstats() = createIntStats(0L, 50L); + SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135); + EXPECT_FALSE(applier.evaluateStripeStatistics(stripeStats)); + } + // Test stripe stats 0 <= x <= 50 and 0 <= y <= 50 + { + orc::proto::StripeStatistics stripeStats; + proto::ColumnStatistics structStatistics; + structStatistics.set_hasnull(false); + *stripeStats.add_colstats() = structStatistics; + *stripeStats.add_colstats() = createIntStats(0L, 50L); + *stripeStats.add_colstats() = createIntStats(0L, 50L); + SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135); + EXPECT_TRUE(applier.evaluateStripeStatistics(stripeStats)); + } + // Test file stats 0 <= x <= 10 and 0 <= y <= 50 + { + orc::proto::Footer footer; + proto::ColumnStatistics structStatistics; + structStatistics.set_hasnull(false); + *footer.add_statistics() = structStatistics; + *footer.add_statistics() = createIntStats(0L, 10L); + *footer.add_statistics() = createIntStats(0L, 50L); + SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135); + EXPECT_FALSE(applier.evaluateFileStatistics(footer)); + } + // Test file stats 0 <= x <= 50 and 0 <= y <= 30 + { + orc::proto::Footer footer; + proto::ColumnStatistics structStatistics; + structStatistics.set_hasnull(false); + *footer.add_statistics() = structStatistics; + *footer.add_statistics() = createIntStats(0L, 50L); + *footer.add_statistics() = createIntStats(0L, 30L); + SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135); + EXPECT_FALSE(applier.evaluateFileStatistics(footer)); + } + // Test file stats 0 <= x <= 50 and 0 <= y <= 50 + { + orc::proto::Footer footer; + proto::ColumnStatistics structStatistics; + structStatistics.set_hasnull(false); + *footer.add_statistics() = structStatistics; + *footer.add_statistics() = createIntStats(0L, 50L); + *footer.add_statistics() = createIntStats(0L, 50L); + SargsApplier applier(*type, sarg.get(), 1000, WriterVersion_ORC_135); + EXPECT_TRUE(applier.evaluateFileStatistics(footer)); + } + } } // namespace orc