This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new 67a59db2b ORC-1304: [C++] Fix seeking over empty PRESENT stream (#1519)
67a59db2b is described below

commit 67a59db2b4d7b3a241b82b2d637943c1b3b84fce
Author: Quanlong Huang <[email protected]>
AuthorDate: Thu May 25 14:36:54 2023 +0800

    ORC-1304: [C++] Fix seeking over empty PRESENT stream (#1519)
    
    ### What changes were proposed in this pull request?
    This backports ORC-1304 (#1299) to branch-1.8. Resolved some conflicts due 
to ReaderMetrics not supported in branch-1.8.
    
    ### Why are the changes needed?
    The bug of ORC-1304 also occurs on branch-1.8
    
    ### How was this patch tested?
    Ran orc-test
    
    Co-authored-by: coderex2522 <[email protected]>
---
 c++/src/ByteRLE.cc         | 17 ++++----
 c++/src/RLEv1.cc           | 24 ++++++------
 c++/src/RLEv1.hh           |  2 +
 c++/test/TestByteRle.cc    | 11 ++++++
 c++/test/TestReader.cc     | 98 ++++++++++++++++++++++++++++++++++++++++++++++
 c++/test/TestRleDecoder.cc | 13 +++++-
 6 files changed, 146 insertions(+), 19 deletions(-)

diff --git a/c++/src/ByteRLE.cc b/c++/src/ByteRLE.cc
index 1c4a64516..557fc8fd7 100644
--- a/c++/src/ByteRLE.cc
+++ b/c++/src/ByteRLE.cc
@@ -341,6 +341,7 @@ namespace orc {
     inline void nextBuffer();
     inline signed char readByte();
     inline void readHeader();
+    inline void reset();
 
     std::unique_ptr<SeekableInputStream> inputStream;
     size_t remainingValues;
@@ -380,9 +381,7 @@ namespace orc {
     }
   }
 
-  ByteRleDecoderImpl::ByteRleDecoderImpl(std::unique_ptr<SeekableInputStream>
-                                         input) {
-    inputStream = std::move(input);
+  void ByteRleDecoderImpl::reset() {
     repeating = false;
     remainingValues = 0;
     value = 0;
@@ -390,6 +389,12 @@ namespace orc {
     bufferEnd = nullptr;
   }
 
+  ByteRleDecoderImpl::ByteRleDecoderImpl(
+                        std::unique_ptr<SeekableInputStream> input) {
+    inputStream = std::move(input);
+    reset();
+  }
+
   ByteRleDecoderImpl::~ByteRleDecoderImpl() {
     // PASS
   }
@@ -397,10 +402,8 @@ namespace orc {
   void ByteRleDecoderImpl::seek(PositionProvider& location) {
     // move the input stream
     inputStream->seek(location);
-    // force a re-read from the stream
-    bufferEnd = bufferStart;
-    // read a new header
-    readHeader();
+    // reset the decoder status and lazily call readHeader()
+    reset();
     // skip ahead the given number of records
     ByteRleDecoderImpl::skip(location.next());
   }
diff --git a/c++/src/RLEv1.cc b/c++/src/RLEv1.cc
index fe333978d..d80a81a56 100644
--- a/c++/src/RLEv1.cc
+++ b/c++/src/RLEv1.cc
@@ -190,25 +190,27 @@ void RleDecoderV1::readHeader() {
   }
 }
 
+void RleDecoderV1::reset() {
+  remainingValues = 0;
+  value = 0;
+  bufferStart = nullptr;
+  bufferEnd = nullptr;
+  delta = 0;
+  repeating = false;
+}
+
 RleDecoderV1::RleDecoderV1(std::unique_ptr<SeekableInputStream> input,
                            bool hasSigned)
     : inputStream(std::move(input)),
-      isSigned(hasSigned),
-      remainingValues(0),
-      value(0),
-      bufferStart(nullptr),
-      bufferEnd(bufferStart),
-      delta(0),
-      repeating(false) {
+      isSigned(hasSigned) {
+  reset();
 }
 
 void RleDecoderV1::seek(PositionProvider& location) {
   // move the input stream
   inputStream->seek(location);
-  // force a re-read from the stream
-  bufferEnd = bufferStart;
-  // read a new header
-  readHeader();
+  // reset the decoder status and lazily call readHeader()
+  reset();
   // skip ahead the given number of records
   skip(location.next());
 }
diff --git a/c++/src/RLEv1.hh b/c++/src/RLEv1.hh
index 8e31d7087..f05ab17a1 100644
--- a/c++/src/RLEv1.hh
+++ b/c++/src/RLEv1.hh
@@ -77,6 +77,8 @@ private:
 
     inline void skipLongs(uint64_t numValues);
 
+    inline void reset();
+
     const std::unique_ptr<SeekableInputStream> inputStream;
     const bool isSigned;
     uint64_t remainingValues;
diff --git a/c++/test/TestByteRle.cc b/c++/test/TestByteRle.cc
index 38c01d336..bdcc13c9e 100644
--- a/c++/test/TestByteRle.cc
+++ b/c++/test/TestByteRle.cc
@@ -813,6 +813,17 @@ TEST(ByteRle, testSeek) {
   } while (i != 0);
 }
 
+TEST(ByteRle, seekOverEmptyPresentStream) {
+  const char* buffer = nullptr;
+  std::unique_ptr<ByteRleDecoder> rle =
+      createByteRleDecoder(
+        std::unique_ptr<orc::SeekableInputStream>
+             (new SeekableArrayInputStream(buffer, 0, 1)));
+  std::list<uint64_t> position(2, 0);
+  PositionProvider location(position);
+  rle->seek(location);
+}
+
 TEST(BooleanRle, simpleTest) {
   const unsigned char buffer[] = {0x61, 0xf0, 0xfd, 0x55, 0xAA, 0x55};
   std::unique_ptr<SeekableInputStream> stream
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index 237deab05..7578d81a7 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -672,4 +672,102 @@ namespace orc {
     EXPECT_EQ(1, nestedUnionBatch.offsets.data()[1]);
   }
 
+  TEST(TestReadIntent, testSeekOverEmptyPresentStream) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    uint64_t rowCount = 5000;
+    {
+      auto type = std::unique_ptr<Type>(
+        Type::buildTypeFromString(
+          "struct<col1:struct<col2:int>,col3:struct<col4:int>,"
+          "col5:array<int>,col6:map<int,int>>"));
+      WriterOptions options;
+      options.setStripeSize(1024 * 1024)
+          .setCompressionBlockSize(1024)
+          .setCompression(CompressionKind_NONE)
+          .setMemoryPool(pool)
+          .setRowIndexStride(1000);
+
+      // the child columns of the col3,col5,col6 have the empty present stream
+      auto writer = createWriter(*type, &memStream, options);
+      auto batch = writer->createRowBatch(rowCount);
+      auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+      auto& structBatch1 = 
dynamic_cast<StructVectorBatch&>(*structBatch.fields[0]);
+      auto& structBatch2 = 
dynamic_cast<StructVectorBatch&>(*structBatch.fields[1]);
+      auto& listBatch = dynamic_cast<ListVectorBatch&>(*structBatch.fields[2]);
+      auto& mapBatch = dynamic_cast<MapVectorBatch&>(*structBatch.fields[3]);
+
+      auto& longBatch1 = 
dynamic_cast<LongVectorBatch&>(*structBatch1.fields[0]);
+      auto& longBatch2 = 
dynamic_cast<LongVectorBatch&>(*structBatch2.fields[0]);
+      auto& longBatch3 = dynamic_cast<LongVectorBatch&>(*listBatch.elements);
+      auto& longKeyBatch = dynamic_cast<LongVectorBatch&>(*mapBatch.keys);
+      auto& longValueBatch = 
dynamic_cast<LongVectorBatch&>(*mapBatch.elements);
+
+      structBatch.numElements = rowCount;
+      structBatch1.numElements = rowCount;
+      structBatch2.numElements = rowCount;
+      listBatch.numElements = rowCount;
+      mapBatch.numElements = rowCount;
+      longBatch1.numElements = rowCount;
+      longBatch2.numElements = rowCount;
+      longBatch3.numElements = rowCount;
+      longKeyBatch.numElements = rowCount;
+      longValueBatch.numElements = rowCount;
+
+      structBatch1.hasNulls = false;
+      structBatch2.hasNulls = true;
+      listBatch.hasNulls = true;
+      mapBatch.hasNulls = true;
+      longBatch1.hasNulls = false;
+      longBatch2.hasNulls = true;
+      longBatch3.hasNulls = true;
+      longKeyBatch.hasNulls = true;
+      longValueBatch.hasNulls = true;
+      for (uint64_t i = 0; i < rowCount; ++i) {
+        longBatch1.data[i] = static_cast<int64_t>(i);
+        longBatch1.notNull[i] = 1;
+
+        structBatch2.notNull[i] = 0;
+        listBatch.notNull[i] = 0;
+        listBatch.offsets[i] = 0;
+        mapBatch.notNull[i] = 0;
+        longBatch2.notNull[i] = 0;
+        longBatch3.notNull[i] = 0;
+        longKeyBatch.notNull[i] = 0;
+        longValueBatch.notNull[i] = 0;
+      }
+      writer->add(*batch);
+      writer->close();
+    }
+    {
+      std::unique_ptr<InputStream> inStream(
+        new MemoryInputStream(memStream.getData(), memStream.getLength()));
+      ReaderOptions readerOptions;
+      readerOptions.setMemoryPool(*pool);
+      std::unique_ptr<Reader> reader =
+        createReader(std::move(inStream), readerOptions);
+      EXPECT_EQ(rowCount, reader->getNumberOfRows());
+      std::unique_ptr<RowReader> rowReader =
+        reader->createRowReader(RowReaderOptions());
+      auto batch = rowReader->createRowBatch(1000);
+      // seek over the empty present stream
+      rowReader->seekToRow(2000);
+      EXPECT_TRUE(rowReader->next(*batch));
+      EXPECT_EQ(1000, batch->numElements);
+      auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+      auto& structBatch1 = 
dynamic_cast<StructVectorBatch&>(*structBatch.fields[0]);
+      auto& structBatch2 = 
dynamic_cast<StructVectorBatch&>(*structBatch.fields[1]);
+      auto& listBatch = dynamic_cast<ListVectorBatch&>(*structBatch.fields[2]);
+      auto& mapBatch = dynamic_cast<MapVectorBatch&>(*structBatch.fields[3]);
+
+      auto& longBatch1 = 
dynamic_cast<LongVectorBatch&>(*structBatch1.fields[0]);
+      for (uint64_t i = 0; i < 1000; ++i) {
+        EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>(i + 2000));
+        EXPECT_TRUE(longBatch1.notNull[i]);
+        EXPECT_FALSE(structBatch2.notNull[i]);
+        EXPECT_FALSE(listBatch.notNull[i]);
+        EXPECT_FALSE(mapBatch.notNull[i]);
+      }
+    }
+  }
 }  // namespace
diff --git a/c++/test/TestRleDecoder.cc b/c++/test/TestRleDecoder.cc
index 1b4ca4e89..3f2409bdb 100644
--- a/c++/test/TestRleDecoder.cc
+++ b/c++/test/TestRleDecoder.cc
@@ -2986,6 +2986,17 @@ TEST(RLEv1, testLeadingNulls) {
   for (size_t i = 5; i < 10; ++i) {
     EXPECT_EQ(i - 4, data[i]) << "Output wrong at " << i;
   }
-};
+}
+
+TEST(RLEv1, seekOverEmptyPresentStream) {
+  const char* buffer = nullptr;
+  std::unique_ptr<RleDecoder> rle =
+      createRleDecoder(std::unique_ptr<SeekableInputStream>
+                      (new SeekableArrayInputStream(buffer, 0, 1)),
+                      false, RleVersion_1, *getDefaultPool());
+  std::list<uint64_t> position(2, 0);
+  PositionProvider location(position);
+  rle->seek(location);
+}
 
 }  // namespace orc

Reply via email to