This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 217fb0bc8 ORC-1159: [C++] Fix crash when the last stripe is skipped
217fb0bc8 is described below

commit 217fb0bc80c2fb2320062df5f449e9a65380a720
Author: Quanlong Huang <[email protected]>
AuthorDate: Tue May 3 10:29:52 2022 +0800

    ORC-1159: [C++] Fix crash when the last stripe is skipped
    
    This closes #1099
---
 c++/src/Reader.cc                 |  32 ++++++----
 c++/src/Reader.hh                 |   1 +
 c++/test/TestPredicatePushdown.cc | 121 ++++++++++++++++++++++++++++++++------
 3 files changed, 125 insertions(+), 29 deletions(-)

diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 810b01d7b..e53dc4c86 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -1035,6 +1035,20 @@ namespace orc {
     return memory + decompressorMemory ;
   }
 
+  // Update fields to indicate we've reached the end of file
+  void RowReaderImpl::markEndOfFile() {
+    currentStripe = lastStripe;
+    currentRowInStripe = 0;
+    rowsInCurrentStripe = 0;
+    if (lastStripe == 0) {
+      // Empty file
+      previousRow = 0;
+    } else {
+      previousRow = firstRowOfStripe[lastStripe - 1] +
+          footer->stripes(static_cast<int>(lastStripe - 1)).numberofrows();
+    }
+  }
+
   void RowReaderImpl::startNextStripe() {
     reader.reset(); // ColumnReaders use lots of memory; free old memory first
     rowIndexes.clear();
@@ -1043,9 +1057,7 @@ namespace orc {
     // evaluate file statistics if it exists
     if (sargsApplier && !sargsApplier->evaluateFileStatistics(*footer)) {
       // skip the entire file
-      currentStripe = lastStripe;
-      currentRowInStripe = 0;
-      rowsInCurrentStripe = 0;
+      markEndOfFile();
       return;
     }
 
@@ -1120,18 +1132,16 @@ namespace orc {
           seekToRowGroup(static_cast<uint32_t>(currentRowInStripe / 
footer->rowindexstride()));
         }
       }
+    } else {
+      // All remaining stripes are skipped.
+      markEndOfFile();
     }
   }
 
   bool RowReaderImpl::next(ColumnVectorBatch& data) {
     if (currentStripe >= lastStripe) {
       data.numElements = 0;
-      if (lastStripe > 0) {
-        previousRow = firstRowOfStripe[lastStripe - 1] +
-          footer->stripes(static_cast<int>(lastStripe - 1)).numberofrows();
-      } else {
-        previousRow = 0;
-      }
+      markEndOfFile();
       return false;
     }
     if (currentRowInStripe == 0) {
@@ -1149,9 +1159,7 @@ namespace orc {
     }
     data.numElements = rowsToRead;
     if (rowsToRead == 0) {
-      previousRow = lastStripe <= 0 ? footer->numberofrows() :
-                    firstRowOfStripe[lastStripe - 1] +
-                    footer->stripes(static_cast<int>(lastStripe - 
1)).numberofrows();
+      markEndOfFile();
       return false;
     }
     if (enableEncodedBlock) {
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 255bb8d25..ffaff4176 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -155,6 +155,7 @@ namespace orc {
     bool enableEncodedBlock;
     // internal methods
     void startNextStripe();
+    inline void markEndOfFile();
 
     // row index of current stripe with column id as the key
     std::unordered_map<uint64_t, proto::RowIndex> rowIndexes;
diff --git a/c++/test/TestPredicatePushdown.cc 
b/c++/test/TestPredicatePushdown.cc
index 926b6ae8a..f12db4e9b 100644
--- a/c++/test/TestPredicatePushdown.cc
+++ b/c++/test/TestPredicatePushdown.cc
@@ -386,7 +386,9 @@ namespace orc {
     TestMultipleSeeksWithoutRowIndexes(reader.get(), false);
   }
 
-  void TestNoRowsSelectedWithFileStats(Reader* reader) {
+  // Test Sarg skips the whole file based on file stats.
+  // Seeking to 'seekRowNumber' (if it's non-negative) before reads.
+  void TestNoRowsSelectedWithFileStats(Reader* reader, int seekRowNumber) {
     std::unique_ptr<SearchArgument> sarg =
       SearchArgumentFactory::newBuilder()
         ->startAnd()
@@ -400,34 +402,114 @@ namespace orc {
     auto rowReader = reader->createRowReader(rowReaderOpts);
 
     auto readBatch = rowReader->createRowBatch(2000);
+    if (seekRowNumber >= 0) {
+      rowReader->seekToRow(static_cast<uint64_t>(seekRowNumber));
+    }
     EXPECT_EQ(false, rowReader->next(*readBatch));
+    EXPECT_EQ(7000, rowReader->getRowNumber());
   }
 
-  void TestSelectedWithStripeStats(Reader* reader) {
+  void TestLastStripeSelectedWithStripeStats(Reader* reader, int 
seekRowNumber) {
+    // Sargs: col1 between 3500 and 7000. First stripe (3500 rows) will be 
skipped.
     std::unique_ptr<SearchArgument> sarg =
       SearchArgumentFactory::newBuilder()
-          ->startAnd()
-          .between("col1",
-                   PredicateDataType::LONG,
-                   Literal(static_cast<int64_t>(3500)),
-                   Literal(static_cast<int64_t>(7000)))
-          .end()
+          ->between("col1",
+                    PredicateDataType::LONG,
+                    Literal(static_cast<int64_t>(3500)),
+                    Literal(static_cast<int64_t>(7000)))
           .build();
 
     RowReaderOptions rowReaderOpts;
     rowReaderOpts.searchArgument(std::move(sarg));
     auto rowReader = reader->createRowReader(rowReaderOpts);
 
-    auto readBatch = rowReader->createRowBatch(2000);
-    EXPECT_EQ(true, rowReader->next(*readBatch));
-    // test previous row number
-    EXPECT_EQ(3500, rowReader->getRowNumber());
-    EXPECT_EQ(2000, readBatch->numElements);
+    if (seekRowNumber >= 0) {
+      rowReader->seekToRow(static_cast<uint64_t>(seekRowNumber));
+    }
+    // Seek within the first stripe which is skipped due to PPD. Any seeks 
within it
+    // will go to the end of the first stripe.
+    if (seekRowNumber < 3500) {
+      auto readBatch = rowReader->createRowBatch(2000);
+      // 1st batch of 2000 rows
+      EXPECT_EQ(true, rowReader->next(*readBatch));
+      // test previous row number
+      EXPECT_EQ(3500, rowReader->getRowNumber());
+      EXPECT_EQ(2000, readBatch->numElements);
+      auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+      auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+      for (uint64_t i = 0; i < 2000; ++i) {
+        EXPECT_EQ(i + 3500 , batch1.data[i]);
+      }
+
+      // 2nd batch of the remaining 1500 rows
+      EXPECT_EQ(true, rowReader->next(*readBatch));
+      // test previous row number
+      EXPECT_EQ(5500, rowReader->getRowNumber());
+      EXPECT_EQ(1500, readBatch->numElements);
+      for (uint64_t i = 0; i < 1500; ++i) {
+        EXPECT_EQ(i + 5500 , batch1.data[i]);
+      }
+      // no more batches
+      EXPECT_EQ(false, rowReader->next(*readBatch));
+      return;
+    }
+
+    // Seek to the end of file
+    if (seekRowNumber >= 7000) {
+      auto readBatch = rowReader->createRowBatch(2000);
+      EXPECT_EQ(false, rowReader->next(*readBatch));
+      EXPECT_EQ(7000, rowReader->getRowNumber());
+      return;
+    }
+
+    {
+      // Seek within the second stripe. Use 3500 as the batch size so we can 
read all rows
+      // at once.
+      auto readBatch = rowReader->createRowBatch(3500);
+      EXPECT_EQ(true, rowReader->next(*readBatch));
+      EXPECT_EQ(seekRowNumber, rowReader->getRowNumber());
+      EXPECT_EQ(7000 - seekRowNumber, readBatch->numElements);
+      auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+      auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+      for (uint64_t i = 0; i < readBatch->numElements; ++i) {
+        EXPECT_EQ(i + static_cast<unsigned long>(seekRowNumber), 
batch1.data[i]);
+      }
+      // no more batches
+      EXPECT_EQ(false, rowReader->next(*readBatch));
+    }
+  }
+
+  void TestFirstStripeSelectedWithStripeStats(Reader* reader, int 
seekRowNumber) {
+    // Sargs: col1 < 3500. Last stripe (3500 rows) will be skipped.
+    std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+        ->lessThan("col1",
+                   PredicateDataType::LONG,
+                   Literal(static_cast<int64_t>(3500)))
+        .build();
+    RowReaderOptions rowReaderOpts;
+    rowReaderOpts.searchArgument(std::move(sarg));
+    auto rowReader = reader->createRowReader(rowReaderOpts);
+
+    auto readBatch = rowReader->createRowBatch(3500);
     auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
     auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
-    for (uint64_t i = 0; i < 2000; ++i) {
-      EXPECT_EQ(i + 3500 , batch1.data[i]);
+
+    uint64_t firstRowNumber = 0;
+    if (seekRowNumber >= 0) {
+      rowReader->seekToRow(static_cast<uint64_t>(seekRowNumber));
+      firstRowNumber = static_cast<uint64_t>(seekRowNumber);
     }
+    if (seekRowNumber < 3500) {
+      EXPECT_EQ(true, rowReader->next(*readBatch));
+      EXPECT_EQ(firstRowNumber, rowReader->getRowNumber());
+      EXPECT_EQ(3500 - firstRowNumber, readBatch->numElements);
+      for (uint64_t i = 0; i < readBatch->numElements; ++i) {
+        EXPECT_EQ(i + firstRowNumber, batch1.data[i]);
+      }
+    }
+    // no more batches
+    EXPECT_EQ(false, rowReader->next(*readBatch));
+    EXPECT_EQ(7000, rowReader->getRowNumber());
   }
 
   TEST(TestPredicatePushdown, testStripeAndFileStats) {
@@ -467,7 +549,12 @@ namespace orc {
     EXPECT_EQ(7000, reader->getNumberOfRows());
     EXPECT_EQ(stripeCount, reader->getNumberOfStripes());
 
-    TestNoRowsSelectedWithFileStats(reader.get());
-    TestSelectedWithStripeStats(reader.get());
+    // Seek to different positions before each test. -1 means no seek.
+    int seekRowNumber[] = {-1, 0, 1000, 4000, 8000};
+    for (int pos : seekRowNumber) {
+      TestNoRowsSelectedWithFileStats(reader.get(), pos);
+      TestLastStripeSelectedWithStripeStats(reader.get(), pos);
+      TestFirstStripeSelectedWithStripeStats(reader.get(), pos);
+    }
   }
 }  // namespace orc

Reply via email to