This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 217fb0bc8 ORC-1159: [C++] Fix crash when the last stripe is skipped
217fb0bc8 is described below
commit 217fb0bc80c2fb2320062df5f449e9a65380a720
Author: Quanlong Huang <[email protected]>
AuthorDate: Tue May 3 10:29:52 2022 +0800
ORC-1159: [C++] Fix crash when the last stripe is skipped
This closes #1099
---
c++/src/Reader.cc | 32 ++++++----
c++/src/Reader.hh | 1 +
c++/test/TestPredicatePushdown.cc | 121 ++++++++++++++++++++++++++++++++------
3 files changed, 125 insertions(+), 29 deletions(-)
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 810b01d7b..e53dc4c86 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -1035,6 +1035,20 @@ namespace orc {
return memory + decompressorMemory ;
}
+ // Update fields to indicate we've reached the end of file
+ void RowReaderImpl::markEndOfFile() {
+ currentStripe = lastStripe;
+ currentRowInStripe = 0;
+ rowsInCurrentStripe = 0;
+ if (lastStripe == 0) {
+ // Empty file
+ previousRow = 0;
+ } else {
+ previousRow = firstRowOfStripe[lastStripe - 1] +
+ footer->stripes(static_cast<int>(lastStripe - 1)).numberofrows();
+ }
+ }
+
void RowReaderImpl::startNextStripe() {
reader.reset(); // ColumnReaders use lots of memory; free old memory first
rowIndexes.clear();
@@ -1043,9 +1057,7 @@ namespace orc {
// evaluate file statistics if it exists
if (sargsApplier && !sargsApplier->evaluateFileStatistics(*footer)) {
// skip the entire file
- currentStripe = lastStripe;
- currentRowInStripe = 0;
- rowsInCurrentStripe = 0;
+ markEndOfFile();
return;
}
@@ -1120,18 +1132,16 @@ namespace orc {
seekToRowGroup(static_cast<uint32_t>(currentRowInStripe /
footer->rowindexstride()));
}
}
+ } else {
+ // All remaining stripes are skipped.
+ markEndOfFile();
}
}
bool RowReaderImpl::next(ColumnVectorBatch& data) {
if (currentStripe >= lastStripe) {
data.numElements = 0;
- if (lastStripe > 0) {
- previousRow = firstRowOfStripe[lastStripe - 1] +
- footer->stripes(static_cast<int>(lastStripe - 1)).numberofrows();
- } else {
- previousRow = 0;
- }
+ markEndOfFile();
return false;
}
if (currentRowInStripe == 0) {
@@ -1149,9 +1159,7 @@ namespace orc {
}
data.numElements = rowsToRead;
if (rowsToRead == 0) {
- previousRow = lastStripe <= 0 ? footer->numberofrows() :
- firstRowOfStripe[lastStripe - 1] +
- footer->stripes(static_cast<int>(lastStripe -
1)).numberofrows();
+ markEndOfFile();
return false;
}
if (enableEncodedBlock) {
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 255bb8d25..ffaff4176 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -155,6 +155,7 @@ namespace orc {
bool enableEncodedBlock;
// internal methods
void startNextStripe();
+ inline void markEndOfFile();
// row index of current stripe with column id as the key
std::unordered_map<uint64_t, proto::RowIndex> rowIndexes;
diff --git a/c++/test/TestPredicatePushdown.cc
b/c++/test/TestPredicatePushdown.cc
index 926b6ae8a..f12db4e9b 100644
--- a/c++/test/TestPredicatePushdown.cc
+++ b/c++/test/TestPredicatePushdown.cc
@@ -386,7 +386,9 @@ namespace orc {
TestMultipleSeeksWithoutRowIndexes(reader.get(), false);
}
- void TestNoRowsSelectedWithFileStats(Reader* reader) {
+ // Test Sarg skips the whole file based on file stats.
+ // Seeking to 'seekRowNumber' (if it's non-negative) before reads.
+ void TestNoRowsSelectedWithFileStats(Reader* reader, int seekRowNumber) {
std::unique_ptr<SearchArgument> sarg =
SearchArgumentFactory::newBuilder()
->startAnd()
@@ -400,34 +402,114 @@ namespace orc {
auto rowReader = reader->createRowReader(rowReaderOpts);
auto readBatch = rowReader->createRowBatch(2000);
+ if (seekRowNumber >= 0) {
+ rowReader->seekToRow(static_cast<uint64_t>(seekRowNumber));
+ }
EXPECT_EQ(false, rowReader->next(*readBatch));
+ EXPECT_EQ(7000, rowReader->getRowNumber());
}
- void TestSelectedWithStripeStats(Reader* reader) {
+ void TestLastStripeSelectedWithStripeStats(Reader* reader, int
seekRowNumber) {
+ // Sargs: col1 between 3500 and 7000. First stripe (3500 rows) will be
skipped.
std::unique_ptr<SearchArgument> sarg =
SearchArgumentFactory::newBuilder()
- ->startAnd()
- .between("col1",
- PredicateDataType::LONG,
- Literal(static_cast<int64_t>(3500)),
- Literal(static_cast<int64_t>(7000)))
- .end()
+ ->between("col1",
+ PredicateDataType::LONG,
+ Literal(static_cast<int64_t>(3500)),
+ Literal(static_cast<int64_t>(7000)))
.build();
RowReaderOptions rowReaderOpts;
rowReaderOpts.searchArgument(std::move(sarg));
auto rowReader = reader->createRowReader(rowReaderOpts);
- auto readBatch = rowReader->createRowBatch(2000);
- EXPECT_EQ(true, rowReader->next(*readBatch));
- // test previous row number
- EXPECT_EQ(3500, rowReader->getRowNumber());
- EXPECT_EQ(2000, readBatch->numElements);
+ if (seekRowNumber >= 0) {
+ rowReader->seekToRow(static_cast<uint64_t>(seekRowNumber));
+ }
+ // Seek within the first stripe which is skipped due to PPD. Any seeks
within it
+ // will go to the end of the first stripe.
+ if (seekRowNumber < 3500) {
+ auto readBatch = rowReader->createRowBatch(2000);
+ // 1st batch of 2000 rows
+ EXPECT_EQ(true, rowReader->next(*readBatch));
+ // test previous row number
+ EXPECT_EQ(3500, rowReader->getRowNumber());
+ EXPECT_EQ(2000, readBatch->numElements);
+ auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+ auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+ for (uint64_t i = 0; i < 2000; ++i) {
+ EXPECT_EQ(i + 3500 , batch1.data[i]);
+ }
+
+ // 2nd batch of the remaining 1500 rows
+ EXPECT_EQ(true, rowReader->next(*readBatch));
+ // test previous row number
+ EXPECT_EQ(5500, rowReader->getRowNumber());
+ EXPECT_EQ(1500, readBatch->numElements);
+ for (uint64_t i = 0; i < 1500; ++i) {
+ EXPECT_EQ(i + 5500 , batch1.data[i]);
+ }
+ // no more batches
+ EXPECT_EQ(false, rowReader->next(*readBatch));
+ return;
+ }
+
+ // Seek to the end of file
+ if (seekRowNumber >= 7000) {
+ auto readBatch = rowReader->createRowBatch(2000);
+ EXPECT_EQ(false, rowReader->next(*readBatch));
+ EXPECT_EQ(7000, rowReader->getRowNumber());
+ return;
+ }
+
+ {
+ // Seek within the second stripe. Use 3500 as the batch size so we can
read all rows
+ // at once.
+ auto readBatch = rowReader->createRowBatch(3500);
+ EXPECT_EQ(true, rowReader->next(*readBatch));
+ EXPECT_EQ(seekRowNumber, rowReader->getRowNumber());
+ EXPECT_EQ(7000 - seekRowNumber, readBatch->numElements);
+ auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+ auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+ for (uint64_t i = 0; i < readBatch->numElements; ++i) {
+ EXPECT_EQ(i + static_cast<unsigned long>(seekRowNumber),
batch1.data[i]);
+ }
+ // no more batches
+ EXPECT_EQ(false, rowReader->next(*readBatch));
+ }
+ }
+
+ void TestFirstStripeSelectedWithStripeStats(Reader* reader, int
seekRowNumber) {
+ // Sargs: col1 < 3500. Last stripe (3500 rows) will be skipped.
+ std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+ ->lessThan("col1",
+ PredicateDataType::LONG,
+ Literal(static_cast<int64_t>(3500)))
+ .build();
+ RowReaderOptions rowReaderOpts;
+ rowReaderOpts.searchArgument(std::move(sarg));
+ auto rowReader = reader->createRowReader(rowReaderOpts);
+
+ auto readBatch = rowReader->createRowBatch(3500);
auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
- for (uint64_t i = 0; i < 2000; ++i) {
- EXPECT_EQ(i + 3500 , batch1.data[i]);
+
+ uint64_t firstRowNumber = 0;
+ if (seekRowNumber >= 0) {
+ rowReader->seekToRow(static_cast<uint64_t>(seekRowNumber));
+ firstRowNumber = static_cast<uint64_t>(seekRowNumber);
}
+ if (seekRowNumber < 3500) {
+ EXPECT_EQ(true, rowReader->next(*readBatch));
+ EXPECT_EQ(firstRowNumber, rowReader->getRowNumber());
+ EXPECT_EQ(3500 - firstRowNumber, readBatch->numElements);
+ for (uint64_t i = 0; i < readBatch->numElements; ++i) {
+ EXPECT_EQ(i + firstRowNumber, batch1.data[i]);
+ }
+ }
+ // no more batches
+ EXPECT_EQ(false, rowReader->next(*readBatch));
+ EXPECT_EQ(7000, rowReader->getRowNumber());
}
TEST(TestPredicatePushdown, testStripeAndFileStats) {
@@ -467,7 +549,12 @@ namespace orc {
EXPECT_EQ(7000, reader->getNumberOfRows());
EXPECT_EQ(stripeCount, reader->getNumberOfStripes());
- TestNoRowsSelectedWithFileStats(reader.get());
- TestSelectedWithStripeStats(reader.get());
+ // Seek to different positions before each test. -1 means no seek.
+ int seekRowNumber[] = {-1, 0, 1000, 4000, 8000};
+ for (int pos : seekRowNumber) {
+ TestNoRowsSelectedWithFileStats(reader.get(), pos);
+ TestLastStripeSelectedWithStripeStats(reader.get(), pos);
+ TestFirstStripeSelectedWithStripeStats(reader.get(), pos);
+ }
}
} // namespace orc