This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new ea50fb250 ORC-1160: [C++] Fix seekToRow can't seek within selected row 
group
ea50fb250 is described below

commit ea50fb25041897f35d4fb157cb4b3ebb65b9c56d
Author: Quanlong Huang <[email protected]>
AuthorDate: Thu Apr 28 15:34:15 2022 +0800

    ORC-1160: [C++] Fix seekToRow can't seek within selected row group
    
    This fixes #1100
---
 c++/src/Reader.cc                 |  32 ++++----
 c++/test/TestPredicatePushdown.cc | 155 +++++++++++++++++++++++++++++++++++++-
 2 files changed, 169 insertions(+), 18 deletions(-)

diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 580326726..810b01d7b 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -396,26 +396,28 @@ namespace orc {
     previousRow = rowNumber;
     startNextStripe();
 
-    // when predicate push down is enabled, above call to startNextStripe()
-    // will move current row to 1st matching row group; here we only need
-    // to deal with the case when PPD is not enabled.
-    if (!sargsApplier) {
-      uint64_t rowsToSkip = currentRowInStripe;
-
-      if (footer->rowindexstride() > 0 &&
-          currentStripeInfo.indexlength() > 0) {
+    uint64_t rowsToSkip = currentRowInStripe;
+    auto rowIndexStride = footer->rowindexstride();
+    // seek to the target row group if row indexes exists
+    if (rowIndexStride > 0 && currentStripeInfo.indexlength() > 0) {
+      // when predicate push down is enabled, above call to startNextStripe()
+      // will move current row to 1st matching row group; here we only need
+      // to deal with the case when PPD is not enabled.
+      if (!sargsApplier) {
         if (rowIndexes.empty()) {
           loadStripeIndex();
         }
-        uint32_t rowGroupId =
-          static_cast<uint32_t>(currentRowInStripe / footer->rowindexstride());
-        rowsToSkip -= static_cast<uint64_t>(rowGroupId) * 
footer->rowindexstride();
-
+        auto rowGroupId = static_cast<uint32_t>(rowsToSkip / rowIndexStride);
         if (rowGroupId != 0) {
           seekToRowGroup(rowGroupId);
         }
       }
-
+      // skip leading rows in the target row group
+      rowsToSkip %= rowIndexStride;
+    }
+    // 'reader' is reset in startNextStripe(). It could be nullptr if 
'rowsToSkip' is 0,
+    // e.g. when startNextStripe() skips all remaining rows of the file.
+    if (rowsToSkip > 0) {
       reader->skip(rowsToSkip);
     }
   }
@@ -1083,7 +1085,9 @@ namespace orc {
             // current stripe has at least one row group matching the predicate
             break;
           }
-        } else {
+          isStripeNeeded = false;
+        }
+        if (!isStripeNeeded) {
           // advance to next stripe when current stripe has no matching rows
           currentStripe += 1;
           currentRowInStripe = 0;
diff --git a/c++/test/TestPredicatePushdown.cc 
b/c++/test/TestPredicatePushdown.cc
index 6bd81f091..926b6ae8a 100644
--- a/c++/test/TestPredicatePushdown.cc
+++ b/c++/test/TestPredicatePushdown.cc
@@ -26,7 +26,7 @@ namespace orc {
 
   static const int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; // 10M
 
-  void createMemTestFile(MemoryOutputStream& memStream) {
+  void createMemTestFile(MemoryOutputStream& memStream, uint64_t 
rowIndexStride) {
     MemoryPool * pool = getDefaultPool();
     auto type = std::unique_ptr<Type>(Type::buildTypeFromString(
       "struct<int1:bigint,string1:string>"));
@@ -35,7 +35,7 @@ namespace orc {
       .setCompressionBlockSize(1024)
       .setCompression(CompressionKind_NONE)
       .setMemoryPool(pool)
-      .setRowIndexStride(1000);
+      .setRowIndexStride(rowIndexStride);
 
     auto writer = createWriter(*type, &memStream, options);
     auto batch = writer->createRowBatch(3500);
@@ -223,10 +223,96 @@ namespace orc {
     }
   }
 
+  void TestSeekWithPredicates(Reader* reader, uint64_t seekRowNumber) {
+    // Build search argument (x < 300000) for column 'int1'. Only the first 
row group
+    // will be selected. It has 1000 rows: (0, "0"), (300, "10"), (600, "20"), 
...,
+    // (299700, "9990").
+    std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+        ->lessThan("int1", PredicateDataType::LONG,
+                   Literal(static_cast<int64_t>(300000)))
+        .build();
+    RowReaderOptions rowReaderOpts;
+    rowReaderOpts.searchArgument(std::move(sarg));
+    auto rowReader = reader->createRowReader(rowReaderOpts);
+    auto readBatch = rowReader->createRowBatch(2000);
+    auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+    auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+    auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
+
+    rowReader->seekToRow(seekRowNumber);
+    if (seekRowNumber >= 1000) {
+      // Seek advance the first row group will go to the end of file
+      EXPECT_FALSE(rowReader->next(*readBatch));
+      EXPECT_EQ(0, readBatch->numElements);
+      EXPECT_EQ(3500, rowReader->getRowNumber());
+      return;
+    }
+    EXPECT_TRUE(rowReader->next(*readBatch));
+    EXPECT_EQ(1000 - seekRowNumber, readBatch->numElements);
+    EXPECT_EQ(seekRowNumber, rowReader->getRowNumber());
+    for (uint64_t i = 0; i < readBatch->numElements; ++i) {
+      EXPECT_EQ(300 * (i + seekRowNumber), batch1.data[i]);
+      EXPECT_EQ(std::to_string(10 * (i + seekRowNumber)),
+                std::string(batch2.data[i], 
static_cast<size_t>(batch2.length[i])));
+    }
+    EXPECT_FALSE(rowReader->next(*readBatch));
+    EXPECT_EQ(3500, rowReader->getRowNumber());
+  }
+
+  void TestMultipleSeeksWithPredicates(Reader* reader) {
+    // Build search argument (x >= 300000 AND x < 600000) for column 'int1'. 
Only the 2nd
+    // row group will be selected.
+    std::unique_ptr<SearchArgument> sarg = SearchArgumentFactory::newBuilder()
+        ->startAnd()
+        .startNot()
+        .lessThan("int1", PredicateDataType::LONG,
+                  Literal(static_cast<int64_t>(300000L)))
+        .end()
+        .lessThan("int1", PredicateDataType::LONG,
+                  Literal(static_cast<int64_t>(600000L)))
+        .end()
+        .build();
+    RowReaderOptions rowReaderOpts;
+    rowReaderOpts.searchArgument(std::move(sarg));
+    auto rowReader = reader->createRowReader(rowReaderOpts);
+
+    // Read only one row after each seek
+    auto readBatch = rowReader->createRowBatch(1);
+    auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+    auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+    auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
+
+    // Seek within the 1st row group will go to the start of the 2nd row group
+    rowReader->seekToRow(10);
+    EXPECT_TRUE(rowReader->next(*readBatch));
+    EXPECT_EQ(1000, rowReader->getRowNumber()) << "Should start at the 2nd row 
group";
+    EXPECT_EQ(1, readBatch->numElements);
+    EXPECT_EQ(300000, batch1.data[0]);
+    EXPECT_EQ("10000", std::string(batch2.data[0], 
static_cast<size_t>(batch2.length[0])));
+
+    // Seek within the 2nd row group (1000 rows) which is selected by the 
search argument
+    uint64_t seekRowNum[] = {1001, 1010, 1100, 1500, 1999};
+    for (uint64_t pos : seekRowNum) {
+      rowReader->seekToRow(pos);
+      EXPECT_TRUE(rowReader->next(*readBatch));
+      EXPECT_EQ(pos, rowReader->getRowNumber());
+      EXPECT_EQ(1, readBatch->numElements);
+      EXPECT_EQ(300 * pos, batch1.data[0]);
+      EXPECT_EQ(std::to_string(10 * pos),
+                std::string(batch2.data[0], 
static_cast<size_t>(batch2.length[0])));
+    }
+
+    // Seek advance the 2nd row group will go to the end of file
+    rowReader->seekToRow(2000);
+    EXPECT_FALSE(rowReader->next(*readBatch));
+    EXPECT_EQ(3500, rowReader->getRowNumber());
+    EXPECT_EQ(0, readBatch->numElements);
+  }
+
   TEST(TestPredicatePushdown, testPredicatePushdown) {
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
-    MemoryPool * pool = getDefaultPool();
-    createMemTestFile(memStream);
+    MemoryPool* pool = getDefaultPool();
+    createMemTestFile(memStream, 1000);
     std::unique_ptr<InputStream> inStream(new MemoryInputStream (
       memStream.getData(), memStream.getLength()));
     ReaderOptions readerOptions;
@@ -237,6 +323,67 @@ namespace orc {
     TestRangePredicates(reader.get());
     TestNoRowsSelected(reader.get());
     TestOrPredicates(reader.get());
+
+    uint64_t seekRowNumbers[] = {0, 10, 100, 500, 999, 1000, 1001, 4000};
+    for (uint64_t seekRowNumber : seekRowNumbers) {
+      TestSeekWithPredicates(reader.get(), seekRowNumber);
+    }
+
+    TestMultipleSeeksWithPredicates(reader.get());
+  }
+
+  void TestMultipleSeeksWithoutRowIndexes(Reader* reader, bool createSarg) {
+    RowReaderOptions rowReaderOpts;
+    if (createSarg) {
+      // Build search argument x < 300000 for column 'int1'. All rows will be 
selected
+      // since there are no row indexes in the file.
+      std::unique_ptr<SearchArgument> sarg = 
SearchArgumentFactory::newBuilder()
+          ->lessThan("int1", PredicateDataType::LONG,
+                     Literal(static_cast<int64_t>(300000L)))
+          .build();
+      rowReaderOpts.searchArgument(std::move(sarg));
+    }
+    auto rowReader = reader->createRowReader(rowReaderOpts);
+
+    // Read only one row after each seek
+    auto readBatch = rowReader->createRowBatch(1);
+    auto& batch0 = dynamic_cast<StructVectorBatch&>(*readBatch);
+    auto& batch1 = dynamic_cast<LongVectorBatch&>(*batch0.fields[0]);
+    auto& batch2 = dynamic_cast<StringVectorBatch&>(*batch0.fields[1]);
+
+    // Seeks within the file
+    uint64_t seekRowNum[] = {0, 1, 100, 999, 1001, 1010, 1100, 1500, 1999, 
3000, 3499};
+    for (uint64_t pos : seekRowNum) {
+      rowReader->seekToRow(pos);
+      EXPECT_TRUE(rowReader->next(*readBatch));
+      EXPECT_EQ(pos, rowReader->getRowNumber());
+      EXPECT_EQ(1, readBatch->numElements);
+      EXPECT_EQ(300 * pos, batch1.data[0]);
+      EXPECT_EQ(std::to_string(10 * pos),
+                std::string(batch2.data[0], 
static_cast<size_t>(batch2.length[0])));
+    }
+
+    // Seek advance the end of file
+    rowReader->seekToRow(4000);
+    EXPECT_FALSE(rowReader->next(*readBatch));
+    EXPECT_EQ(3500, rowReader->getRowNumber());
+    EXPECT_EQ(0, readBatch->numElements);
+  }
+
+  TEST(TestPredicatePushdown, testPredicatePushdownWithoutRowIndexes) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    // Create the file with rowIndexStride=0, so there are no row groups or 
row indexes.
+    createMemTestFile(memStream, 0);
+    std::unique_ptr<InputStream> inStream(new MemoryInputStream (
+      memStream.getData(), memStream.getLength()));
+    ReaderOptions readerOptions;
+    readerOptions.setMemoryPool(*pool);
+    std::unique_ptr<Reader> reader = createReader(std::move(inStream), 
readerOptions);
+    EXPECT_EQ(3500, reader->getNumberOfRows());
+
+    TestMultipleSeeksWithoutRowIndexes(reader.get(), true);
+    TestMultipleSeeksWithoutRowIndexes(reader.get(), false);
   }
 
   void TestNoRowsSelectedWithFileStats(Reader* reader) {

Reply via email to