This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 42aed71  ORC-1130:[C++] Suppress the present stream when the data 
stream has no null value
42aed71 is described below

commit 42aed7145444f614e3414bc29bb43378cc967003
Author: coderex2522 <[email protected]>
AuthorDate: Tue Mar 22 10:15:52 2022 +0800

    ORC-1130:[C++] Suppress the present stream when the data stream has no null 
value
    
    This closes #1067
---
 c++/src/ByteRLE.cc         | 29 +++++++++++---
 c++/src/ByteRLE.hh         |  5 +++
 c++/src/ColumnWriter.cc    | 32 +++++++++++++++-
 c++/src/ColumnWriter.hh    |  1 +
 c++/src/io/OutputStream.cc |  4 ++
 c++/src/io/OutputStream.hh |  1 +
 c++/test/TestWriter.cc     | 95 ++++++++++++++++++++++++++++++++++++++++++++++
 7 files changed, 159 insertions(+), 8 deletions(-)

diff --git a/c++/src/ByteRLE.cc b/c++/src/ByteRLE.cc
index ee1a457..1c4a645 100644
--- a/c++/src/ByteRLE.cc
+++ b/c++/src/ByteRLE.cc
@@ -61,6 +61,13 @@ namespace orc {
 
     virtual void recordPosition(PositionRecorder* recorder) const override;
 
+    virtual void suppress() override;
+
+    /**
+     * Reset to initial state
+     */
+    void reset();
+
   protected:
     std::unique_ptr<BufferedOutputStream> outputStream;
     char* literals;
@@ -80,12 +87,7 @@ namespace orc {
                                 std::unique_ptr<BufferedOutputStream> output)
                                   : outputStream(std::move(output)) {
     literals = new char[MAX_LITERAL_SIZE];
-    numLiterals = 0;
-    tailRunLength = 0;
-    repeat = false;
-    bufferPosition = 0;
-    bufferLength = 0;
-    buffer = nullptr;
+    reset();
   }
 
   ByteRleEncoderImpl::~ByteRleEncoderImpl() {
@@ -203,6 +205,21 @@ namespace orc {
     recorder->add(static_cast<uint64_t>(numLiterals));
   }
 
+  void ByteRleEncoderImpl::reset() {
+    numLiterals = 0;
+    tailRunLength = 0;
+    repeat = false;
+    bufferPosition = 0;
+    bufferLength = 0;
+    buffer = nullptr;
+  }
+
+  void ByteRleEncoderImpl::suppress() {
+    // written data can be just ignored because they are only flushed in memory
+    outputStream->suppress();
+    reset();
+  }
+
   std::unique_ptr<ByteRleEncoder> createByteRleEncoder
                               (std::unique_ptr<BufferedOutputStream> output) {
     return std::unique_ptr<ByteRleEncoder>(new ByteRleEncoderImpl
diff --git a/c++/src/ByteRLE.hh b/c++/src/ByteRLE.hh
index 71ca579..2f6e2eb 100644
--- a/c++/src/ByteRLE.hh
+++ b/c++/src/ByteRLE.hh
@@ -55,6 +55,11 @@ namespace orc {
      * @param recorder use the recorder to record current positions
      */
     virtual void recordPosition(PositionRecorder* recorder) const = 0;
+
+    /**
+     * suppress the data and reset to initial state
+     */
+    virtual void suppress() = 0;
   };
 
   class ByteRleDecoder {
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index fd77d70..559ca1a 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -100,7 +100,8 @@ namespace orc {
                                 enableBloomFilter(false),
                                 memPool(*options.getMemoryPool()),
                                 indexStream(),
-                                bloomFilterStream() {
+                                bloomFilterStream(),
+                                hasNullValue(false) {
 
     std::unique_ptr<BufferedOutputStream> presentStream =
         factory.createStream(proto::Stream_Kind_PRESENT);
@@ -139,10 +140,22 @@ namespace orc {
                          uint64_t offset,
                          uint64_t numValues,
                          const char* incomingMask) {
-    notNullEncoder->add(batch.notNull.data() + offset, numValues, 
incomingMask);
+    const char* notNull = batch.notNull.data() + offset;
+    notNullEncoder->add(notNull, numValues, incomingMask);
+    hasNullValue |= batch.hasNulls;
+    for (uint64_t i = 0; !hasNullValue && i < numValues; ++i) {
+      if (!notNull[i]) {
+        hasNullValue = true;
+      }
+    }
   }
 
   void ColumnWriter::flush(std::vector<proto::Stream>& streams) {
+    if (!hasNullValue) {
+      // supress the present stream
+      notNullEncoder->suppress();
+      return;
+    }
     proto::Stream stream;
     stream.set_kind(proto::Stream_Kind_PRESENT);
     stream.set_column(static_cast<uint32_t>(columnId));
@@ -199,6 +212,21 @@ namespace orc {
   }
 
   void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
+    if (!hasNullValue) {
+      // remove positions of present stream
+      int presentCount = indexStream->isCompressed() ? 4 : 3;
+      for (int i = 0; i != rowIndex->entry_size(); ++i) {
+        proto::RowIndexEntry* entry = rowIndex->mutable_entry(i);
+        std::vector<uint64_t> positions;
+        for (int j = presentCount; j < entry->positions_size(); ++j) {
+          positions.push_back(entry->positions(j));
+        }
+        entry->clear_positions();
+        for (size_t j = 0; j != positions.size(); ++j) {
+          entry->add_positions(positions[j]);
+        }
+      }
+    }
     // write row index to output stream
     rowIndex->SerializeToZeroCopyStream(indexStream.get());
 
diff --git a/c++/src/ColumnWriter.hh b/c++/src/ColumnWriter.hh
index cbbb5d0..2098377 100644
--- a/c++/src/ColumnWriter.hh
+++ b/c++/src/ColumnWriter.hh
@@ -207,6 +207,7 @@ namespace orc {
     MemoryPool& memPool;
     std::unique_ptr<BufferedOutputStream> indexStream;
     std::unique_ptr<BufferedOutputStream> bloomFilterStream;
+    bool hasNullValue;
   };
 
   /**
diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc
index fd71c4b..8770d28 100644
--- a/c++/src/io/OutputStream.cc
+++ b/c++/src/io/OutputStream.cc
@@ -97,6 +97,10 @@ namespace orc {
     return dataSize;
   }
 
+  void BufferedOutputStream::suppress() {
+    dataBuffer->resize(0);
+  }
+
   void AppendOnlyBufferedStream::write(const char * data, size_t size) {
     size_t dataOffset = 0;
     while (size > 0) {
diff --git a/c++/src/io/OutputStream.hh b/c++/src/io/OutputStream.hh
index 6d04629..69c06d5 100644
--- a/c++/src/io/OutputStream.hh
+++ b/c++/src/io/OutputStream.hh
@@ -62,6 +62,7 @@ namespace orc {
     virtual std::string getName() const;
     virtual uint64_t getSize() const;
     virtual uint64_t flush();
+    virtual void suppress();
 
     virtual bool isCompressed() const { return false; }
   };
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index c2e50c1..8df2c22 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -21,6 +21,7 @@
 
 #include "MemoryInputStream.hh"
 #include "MemoryOutputStream.hh"
+#include "Reader.hh"
 
 #include "wrap/gmock.h"
 #include "wrap/gtest-wrapper.h"
@@ -1901,5 +1902,99 @@ namespace orc {
     }
   }
 
+  TEST(WriterTest, testSuppressPresentStream) {
+    MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    size_t rowCount = 2000;
+    {
+      auto type = std::unique_ptr<Type>(
+        Type::buildTypeFromString("struct<col1:int,col2:int>"));
+      WriterOptions options;
+      options.setStripeSize(1024 * 1024)
+          .setCompressionBlockSize(1024)
+          .setCompression(CompressionKind_NONE)
+          .setMemoryPool(pool)
+          .setRowIndexStride(1000);
+
+      auto writer = createWriter(*type, &memStream, options);
+      auto batch = writer->createRowBatch(rowCount);
+      auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+      auto& longBatch1 = 
dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+      auto& longBatch2 = 
dynamic_cast<LongVectorBatch&>(*structBatch.fields[1]);
+      structBatch.numElements = rowCount;
+      longBatch1.numElements = rowCount;
+      longBatch2.numElements = rowCount;
+      longBatch1.hasNulls = true;
+      for (size_t i = 0; i < rowCount; ++i) {
+        if (i % 2 == 0) {
+          longBatch1.notNull[i] = 0;
+        } else {
+          longBatch1.notNull[i] = 1;
+          longBatch1.data[i] = static_cast<int64_t>(i*100);
+        }
+        longBatch2.data[i] = static_cast<int64_t>(i*300);
+      }
+      writer->add(*batch);
+      writer->close();
+    }
+    // read file & check the present stream
+    {
+      std::unique_ptr<InputStream> inStream(
+        new MemoryInputStream(memStream.getData(), memStream.getLength()));
+      ReaderOptions readerOptions;
+      readerOptions.setMemoryPool(*pool);
+      std::unique_ptr<Reader> reader =
+        createReader(std::move(inStream), readerOptions);
+      EXPECT_EQ(rowCount, reader->getNumberOfRows());
+      std::unique_ptr<RowReader> rowReader = createRowReader(reader.get());
+      auto batch = rowReader->createRowBatch(1000);
+      EXPECT_TRUE(rowReader->next(*batch));
+      EXPECT_EQ(1000, batch->numElements);
+      auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
+      auto& longBatch1 = 
dynamic_cast<LongVectorBatch&>(*structBatch.fields[0]);
+      auto& longBatch2 = 
dynamic_cast<LongVectorBatch&>(*structBatch.fields[1]);
+      for (size_t i = 0; i < 1000; ++i) {
+        if (i % 2 == 0) {
+          EXPECT_FALSE(longBatch1.notNull[i]);
+         } else {
+          EXPECT_TRUE(longBatch1.notNull[i]);
+          EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>(i*100));
+        }
+        EXPECT_EQ(longBatch2.data[i], static_cast<int64_t>(i*300));
+      }
+      // Read rows 1500 - 2000
+      rowReader->seekToRow(1500);
+      EXPECT_TRUE(rowReader->next(*batch));
+      EXPECT_EQ(500, batch->numElements);
+      for (size_t i = 0; i < 500; ++i) {
+        if (i % 2 == 0) {
+          EXPECT_FALSE(longBatch1.notNull[i]);
+         } else {
+          EXPECT_TRUE(longBatch1.notNull[i]);
+          EXPECT_EQ(longBatch1.data[i], static_cast<int64_t>((i + 1500)*100));
+        }
+        EXPECT_EQ(longBatch2.data[i], static_cast<int64_t>((i + 1500)*300));
+      }
+      // fetch StripeFooter from pb stream
+      std::unique_ptr<StripeInformation> stripeInfo = reader->getStripe(0);
+      ReaderImpl* readerImpl = dynamic_cast<ReaderImpl*>(reader.get());
+      std::unique_ptr<SeekableInputStream> pbStream(
+        new SeekableFileInputStream(readerImpl->getStream(),
+        stripeInfo->getOffset() + stripeInfo->getIndexLength() + 
stripeInfo->getDataLength(),
+        stripeInfo->getFooterLength(),
+        *pool));
+      proto::StripeFooter stripeFooter;
+      if (!stripeFooter.ParseFromZeroCopyStream(pbStream.get())) {
+        throw ParseError("Parse stripe footer from pb stream failed");
+      }
+      for (int i = 0; i < stripeFooter.streams_size(); ++i) {
+        const proto::Stream& stream = stripeFooter.streams(i);
+        if (stream.has_kind() && stream.kind() == proto::Stream_Kind_PRESENT) {
+          EXPECT_EQ(stream.column(), 1UL);
+        }
+      }
+    }
+  }
+
   INSTANTIATE_TEST_CASE_P(OrcTest, WriterTest, Values(FileVersion::v_0_11(), 
FileVersion::v_0_12()));
 }

Reply via email to