This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 08c3480ac ORC-1264: [C++] Add a writer option to align compression 
block with row group
08c3480ac is described below

commit 08c3480aceeb2805cadc594c181b7c3c835d19c3
Author: luffy-zh <[email protected]>
AuthorDate: Thu Nov 7 12:05:25 2024 -0800

    ORC-1264: [C++] Add a writer option to align compression block with row 
group
    
    ## What changes were proposed in this pull request?
    Add support for the ORC writer to ensure that the compression block is 
aligned with the row group boundary。
    
    ## Why are the changes needed?
    To reduce unnecessary I/O and decompression when PPD is in effect, we can 
enforce the compression block to be aligned with the row group boundary. For 
more detail, see 
[link](https://issues.apache.org/jira/projects/ORC/issues/ORC-1264?filter=allopenissues)
    
    ## How was this patch tested?
    Uts in TestWriter.cc can convert this patch.
    
    ## Was this patch authored or co-authored using generative AI tooling?
    NO
    
    Closes #2005 from luffy-zh/ORC-1264.
    
    Lead-authored-by: luffy-zh <[email protected]>
    Co-authored-by: Hao Zou <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 c++/include/orc/Reader.hh  |  19 ++++++
 c++/include/orc/Writer.hh  |  13 ++++
 c++/src/ColumnWriter.cc    | 111 +++++++++++++++++++++++++++++++
 c++/src/ColumnWriter.hh    |  12 ++++
 c++/src/Compression.cc     |  17 ++---
 c++/src/Reader.cc          |  67 ++++++++++++++++---
 c++/src/Reader.hh          |  11 ++--
 c++/src/Writer.cc          |  14 ++++
 c++/src/io/OutputStream.cc |  11 +++-
 c++/src/io/OutputStream.hh |   1 +
 c++/test/TestWriter.cc     | 159 ++++++++++++++++++++++++++++++++++++++-------
 11 files changed, 386 insertions(+), 49 deletions(-)

diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 4b254593e..4ddce64ad 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -62,6 +62,15 @@ namespace orc {
   };
   ReaderMetrics* getDefaultReaderMetrics();
 
+  // Row group index of a single column in a stripe.
+  struct RowGroupIndex {
+    // Positions are represented as a two-dimensional array where the first
+    // dimension is row group index and the second dimension is the position
+    // list of the row group. The size of the second dimension should be equal
+    // among all row groups.
+    std::vector<std::vector<uint64_t>> positions;
+  };
+
   /**
    * Options for creating a Reader.
    */
@@ -605,6 +614,16 @@ namespace orc {
      */
     virtual std::map<uint32_t, BloomFilterIndex> getBloomFilters(
         uint32_t stripeIndex, const std::set<uint32_t>& included) const = 0;
+
+    /**
+     * Get row group index of all selected columns in the specified stripe
+     * @param stripeIndex index of the stripe to be read for row group index.
+     * @param included index of selected columns to return (if not specified,
+     *        all columns will be returned).
+     * @return map of row group index keyed by its column index.
+     */
+    virtual std::map<uint32_t, RowGroupIndex> getRowGroupIndex(
+        uint32_t stripeIndex, const std::set<uint32_t>& included = {}) const = 
0;
   };
 
   /**
diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index b560627c4..78f06739b 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -290,6 +290,19 @@ namespace orc {
      * @return if not set, return default value which is 64 KB.
      */
     uint64_t getMemoryBlockSize() const;
+
+    /**
+     * Set whether the compression block should be aligned to row group 
boundary.
+     * The boolean type may not be aligned to row group boundary due to the
+     * requirement of the Boolean RLE encoder to pack input bits into bytes
+     */
+    WriterOptions& setAlignBlockBoundToRowGroup(bool 
alignBlockBoundToRowGroup);
+
+    /**
+     * Get if the compression block should be aligned to row group boundary.
+     * @return if not set, return default value which is false.
+     */
+    bool getAlignBlockBoundToRowGroup() const;
   };
 
   class Writer {
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 7adca1440..d31b1c65d 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -254,6 +254,10 @@ namespace orc {
     // PASS
   }
 
+  void ColumnWriter::finishStreams() {
+    notNullEncoder->finishEncode();
+  }
+
   class StructColumnWriter : public ColumnWriter {
    public:
     StructColumnWriter(const Type& type, const StreamsFactory& factory,
@@ -283,6 +287,8 @@ namespace orc {
 
     virtual void reset() override;
 
+    virtual void finishStreams() override;
+
    private:
     std::vector<std::unique_ptr<ColumnWriter>> children_;
   };
@@ -416,6 +422,13 @@ namespace orc {
     }
   }
 
+  void StructColumnWriter::finishStreams() {
+    ColumnWriter::finishStreams();
+    for (uint32_t i = 0; i < children_.size(); ++i) {
+      children_[i]->finishStreams();
+    }
+  }
+
   template <typename BatchType>
   class IntegerColumnWriter : public ColumnWriter {
    public:
@@ -433,6 +446,8 @@ namespace orc {
 
     virtual void recordPosition() const override;
 
+    virtual void finishStreams() override;
+
    protected:
     std::unique_ptr<RleEncoder> rleEncoder;
 
@@ -528,6 +543,12 @@ namespace orc {
     rleEncoder->recordPosition(rowIndexPosition.get());
   }
 
+  template <typename BatchType>
+  void IntegerColumnWriter<BatchType>::finishStreams() {
+    ColumnWriter::finishStreams();
+    rleEncoder->finishEncode();
+  }
+
   template <typename BatchType>
   class ByteColumnWriter : public ColumnWriter {
    public:
@@ -544,6 +565,8 @@ namespace orc {
 
     virtual void recordPosition() const override;
 
+    virtual void finishStreams() override;
+
    private:
     std::unique_ptr<ByteRleEncoder> byteRleEncoder_;
   };
@@ -637,6 +660,12 @@ namespace orc {
     byteRleEncoder_->recordPosition(rowIndexPosition.get());
   }
 
+  template <typename BatchType>
+  void ByteColumnWriter<BatchType>::finishStreams() {
+    ColumnWriter::finishStreams();
+    byteRleEncoder_->finishEncode();
+  }
+
   template <typename BatchType>
   class BooleanColumnWriter : public ColumnWriter {
    public:
@@ -654,6 +683,8 @@ namespace orc {
 
     virtual void recordPosition() const override;
 
+    virtual void finishStreams() override;
+
    private:
     std::unique_ptr<ByteRleEncoder> rleEncoder_;
   };
@@ -750,6 +781,12 @@ namespace orc {
     rleEncoder_->recordPosition(rowIndexPosition.get());
   }
 
+  template <typename BatchType>
+  void BooleanColumnWriter<BatchType>::finishStreams() {
+    ColumnWriter::finishStreams();
+    rleEncoder_->finishEncode();
+  }
+
   template <typename ValueType, typename BatchType>
   class FloatingColumnWriter : public ColumnWriter {
    public:
@@ -767,6 +804,8 @@ namespace orc {
 
     virtual void recordPosition() const override;
 
+    virtual void finishStreams() override;
+
    private:
     bool isFloat_;
     std::unique_ptr<AppendOnlyBufferedStream> dataStream_;
@@ -878,6 +917,12 @@ namespace orc {
     dataStream_->recordPosition(rowIndexPosition.get());
   }
 
+  template <typename ValueType, typename BatchType>
+  void FloatingColumnWriter<ValueType, BatchType>::finishStreams() {
+    ColumnWriter::finishStreams();
+    dataStream_->finishStream();
+  }
+
   /**
    * Implementation of increasing sorted string dictionary
    */
@@ -1041,6 +1086,8 @@ namespace orc {
 
     virtual void reset() override;
 
+    virtual void finishStreams() override;
+
    private:
     /**
      * dictionary related functions
@@ -1234,6 +1281,14 @@ namespace orc {
     }
   }
 
+  void StringColumnWriter::finishStreams() {
+    ColumnWriter::finishStreams();
+    if (!useDictionary) {
+      directDataStream->finishStream();
+      directLengthEncoder->finishEncode();
+    }
+  }
+
   bool StringColumnWriter::checkDictionaryKeyRatio() {
     if (!doneDictionaryCheck) {
       useDictionary = dictionary.size() <=
@@ -1583,6 +1638,8 @@ namespace orc {
 
     virtual void recordPosition() const override;
 
+    virtual void finishStreams() override;
+
    protected:
     std::unique_ptr<RleEncoder> secRleEncoder, nanoRleEncoder;
 
@@ -1723,6 +1780,12 @@ namespace orc {
     nanoRleEncoder->recordPosition(rowIndexPosition.get());
   }
 
+  void TimestampColumnWriter::finishStreams() {
+    ColumnWriter::finishStreams();
+    secRleEncoder->finishEncode();
+    nanoRleEncoder->finishEncode();
+  }
+
   class DateColumnWriter : public IntegerColumnWriter<LongVectorBatch> {
    public:
     DateColumnWriter(const Type& type, const StreamsFactory& factory, const 
WriterOptions& options);
@@ -1792,6 +1855,8 @@ namespace orc {
 
     virtual void recordPosition() const override;
 
+    virtual void finishStreams() override;
+
    protected:
     RleVersion rleVersion;
     uint64_t precision;
@@ -1910,6 +1975,12 @@ namespace orc {
     scaleEncoder->recordPosition(rowIndexPosition.get());
   }
 
+  void Decimal64ColumnWriter::finishStreams() {
+    ColumnWriter::finishStreams();
+    valueStream->finishStream();
+    scaleEncoder->finishEncode();
+  }
+
   class Decimal64ColumnWriterV2 : public ColumnWriter {
    public:
     Decimal64ColumnWriterV2(const Type& type, const StreamsFactory& factory,
@@ -1926,6 +1997,8 @@ namespace orc {
 
     virtual void recordPosition() const override;
 
+    virtual void finishStreams() override;
+
    protected:
     uint64_t precision;
     uint64_t scale;
@@ -2016,6 +2089,11 @@ namespace orc {
     valueEncoder->recordPosition(rowIndexPosition.get());
   }
 
+  void Decimal64ColumnWriterV2::finishStreams() {
+    ColumnWriter::finishStreams();
+    valueEncoder->finishEncode();
+  }
+
   class Decimal128ColumnWriter : public Decimal64ColumnWriter {
    public:
     Decimal128ColumnWriter(const Type& type, const StreamsFactory& factory,
@@ -2131,6 +2209,8 @@ namespace orc {
 
     virtual void reset() override;
 
+    virtual void finishStreams() override;
+
    private:
     std::unique_ptr<RleEncoder> lengthEncoder_;
     RleVersion rleVersion_;
@@ -2307,6 +2387,14 @@ namespace orc {
     }
   }
 
+  void ListColumnWriter::finishStreams() {
+    ColumnWriter::finishStreams();
+    lengthEncoder_->finishEncode();
+    if (child_) {
+      child_->finishStreams();
+    }
+  }
+
   class MapColumnWriter : public ColumnWriter {
    public:
     MapColumnWriter(const Type& type, const StreamsFactory& factory, const 
WriterOptions& options);
@@ -2339,6 +2427,8 @@ namespace orc {
 
     virtual void reset() override;
 
+    virtual void finishStreams() override;
+
    private:
     std::unique_ptr<ColumnWriter> keyWriter_;
     std::unique_ptr<ColumnWriter> elemWriter_;
@@ -2557,6 +2647,17 @@ namespace orc {
     }
   }
 
+  void MapColumnWriter::finishStreams() {
+    ColumnWriter::finishStreams();
+    lengthEncoder_->finishEncode();
+    if (keyWriter_) {
+      keyWriter_->finishStreams();
+    }
+    if (elemWriter_) {
+      elemWriter_->finishStreams();
+    }
+  }
+
   class UnionColumnWriter : public ColumnWriter {
    public:
     UnionColumnWriter(const Type& type, const StreamsFactory& factory,
@@ -2589,6 +2690,8 @@ namespace orc {
 
     virtual void reset() override;
 
+    virtual void finishStreams() override;
+
    private:
     std::unique_ptr<ByteRleEncoder> rleEncoder_;
     std::vector<std::unique_ptr<ColumnWriter>> children_;
@@ -2760,6 +2863,14 @@ namespace orc {
     }
   }
 
+  void UnionColumnWriter::finishStreams() {
+    ColumnWriter::finishStreams();
+    rleEncoder_->finishEncode();
+    for (uint32_t i = 0; i < children_.size(); ++i) {
+      children_[i]->finishStreams();
+    }
+  }
+
   std::unique_ptr<ColumnWriter> buildWriter(const Type& type, const 
StreamsFactory& factory,
                                             const WriterOptions& options) {
     switch (static_cast<int64_t>(type.getKind())) {
diff --git a/c++/src/ColumnWriter.hh b/c++/src/ColumnWriter.hh
index 8afd1eb72..1c5e15d70 100644
--- a/c++/src/ColumnWriter.hh
+++ b/c++/src/ColumnWriter.hh
@@ -179,6 +179,18 @@ namespace orc {
      */
     virtual void writeDictionary();
 
+    /**
+     * Finalize the encoding and compressing process. This function should be
+     * called after all data required for encoding has been added. It ensures
+     * that any remaining data is processed and the final state of the streams
+     * is set.
+     * Note: boolean type cannot cut off the current byte if it is not filled
+     * with 8 bits, otherwise Boolean RLE may incorrectly read the unfilled
+     * trailing bits. In this case, the last byte will be the head of the next
+     * compression block.
+     */
+    virtual void finishStreams();
+
    protected:
     /**
      * Utility function to translate ColumnStatistics into protobuf form and
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index b5ca5a4c9..f373a75bf 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -176,6 +176,7 @@ namespace orc {
     }
     virtual void finishStream() override {
       compressInternal();
+      BufferedOutputStream::finishStream();
     }
 
    protected:
@@ -982,13 +983,7 @@ namespace orc {
   }
 
   uint64_t BlockCompressionStream::flush() {
-    void* data;
-    int size;
-    if (!Next(&data, &size)) {
-      throw CompressionError("Failed to flush compression buffer.");
-    }
-    BufferedOutputStream::BackUp(outputSize - outputPosition);
-    bufferSize = outputSize = outputPosition = 0;
+    finishStream();
     return BufferedOutputStream::flush();
   }
 
@@ -1031,7 +1026,13 @@ namespace orc {
   }
 
   void BlockCompressionStream::finishStream() {
-    doBlockCompression();
+    void* data;
+    int size;
+    if (!Next(&data, &size)) {
+      throw CompressionError("Failed to flush compression buffer.");
+    }
+    BufferedOutputStream::BackUp(outputSize - outputPosition);
+    bufferSize = outputSize = outputPosition = 0;
   }
 
   /**
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 2966c2c2e..034ea04ee 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -1426,17 +1426,10 @@ namespace orc {
       uint32_t stripeIndex, const std::set<uint32_t>& included) const {
     std::map<uint32_t, BloomFilterIndex> ret;
 
-    // find stripe info
-    if (stripeIndex >= static_cast<uint32_t>(footer_->stripes_size())) {
-      throw std::logic_error("Illegal stripe index: " +
-                             to_string(static_cast<int64_t>(stripeIndex)));
-    }
-    const proto::StripeInformation currentStripeInfo =
-        footer_->stripes(static_cast<int>(stripeIndex));
-    const proto::StripeFooter currentStripeFooter = 
getStripeFooter(currentStripeInfo, *contents_);
+    uint64_t offset;
+    auto currentStripeFooter = loadCurrentStripeFooter(stripeIndex, offset);
 
     // iterate stripe footer to get stream of bloom_filter
-    uint64_t offset = static_cast<uint64_t>(currentStripeInfo.offset());
     for (int i = 0; i < currentStripeFooter.streams_size(); i++) {
       const proto::Stream& stream = currentStripeFooter.streams(i);
       uint32_t column = static_cast<uint32_t>(stream.column());
@@ -1474,6 +1467,62 @@ namespace orc {
     return ret;
   }
 
+  proto::StripeFooter ReaderImpl::loadCurrentStripeFooter(uint32_t stripeIndex,
+                                                          uint64_t& offset) 
const {
+    // find stripe info
+    if (stripeIndex >= static_cast<uint32_t>(footer_->stripes_size())) {
+      throw std::logic_error("Illegal stripe index: " +
+                             to_string(static_cast<int64_t>(stripeIndex)));
+    }
+    const proto::StripeInformation currentStripeInfo =
+        footer_->stripes(static_cast<int>(stripeIndex));
+    offset = static_cast<uint64_t>(currentStripeInfo.offset());
+    return getStripeFooter(currentStripeInfo, *contents_);
+  }
+
+  std::map<uint32_t, RowGroupIndex> ReaderImpl::getRowGroupIndex(
+      uint32_t stripeIndex, const std::set<uint32_t>& included) const {
+    std::map<uint32_t, RowGroupIndex> ret;
+    uint64_t offset;
+    auto currentStripeFooter = loadCurrentStripeFooter(stripeIndex, offset);
+
+    // iterate stripe footer to get stream of row_index
+    for (int i = 0; i < currentStripeFooter.streams_size(); i++) {
+      const proto::Stream& stream = currentStripeFooter.streams(i);
+      uint32_t column = static_cast<uint32_t>(stream.column());
+      uint64_t length = static_cast<uint64_t>(stream.length());
+      RowGroupIndex& rowGroupIndex = ret[column];
+
+      if (stream.kind() == proto::Stream_Kind_ROW_INDEX &&
+          (included.empty() || included.find(column) != included.end())) {
+        std::unique_ptr<SeekableInputStream> pbStream =
+            createDecompressor(contents_->compression,
+                               std::make_unique<SeekableFileInputStream>(
+                                   contents_->stream.get(), offset, length, 
*contents_->pool),
+                               contents_->blockSize, *(contents_->pool), 
contents_->readerMetrics);
+
+        proto::RowIndex pbRowIndex;
+        if (!pbRowIndex.ParseFromZeroCopyStream(pbStream.get())) {
+          std::stringstream errMsgBuffer;
+          errMsgBuffer << "Failed to parse RowIndex at column " << column << " 
in stripe "
+                       << stripeIndex;
+          throw ParseError(errMsgBuffer.str());
+        }
+
+        // add rowGroupIndex to result for one column
+        for (auto& rowIndexEntry : pbRowIndex.entry()) {
+          std::vector<uint64_t> posVector;
+          for (auto& position : rowIndexEntry.positions()) {
+            posVector.push_back(position);
+          }
+          rowGroupIndex.positions.push_back(posVector);
+        }
+      }
+      offset += length;
+    }
+    return ret;
+  }
+
   RowReader::~RowReader() {
     // PASS
   }
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 630d812c3..89606c331 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -265,10 +265,10 @@ namespace orc {
     // internal methods
     void readMetadata() const;
     void checkOrcVersion();
-    void getRowIndexStatistics(
-        const proto::StripeInformation& stripeInfo, uint64_t stripeIndex,
-        const proto::StripeFooter& currentStripeFooter,
-        std::vector<std::vector<proto::ColumnStatistics> >* indexStats) const;
+    void getRowIndexStatistics(const proto::StripeInformation& stripeInfo, 
uint64_t stripeIndex,
+                               const proto::StripeFooter& currentStripeFooter,
+                               
std::vector<std::vector<proto::ColumnStatistics>>* indexStats) const;
+    proto::StripeFooter loadCurrentStripeFooter(uint32_t stripeIndex, 
uint64_t& offset) const;
 
     // metadata
     mutable bool isMetadataLoaded_;
@@ -374,6 +374,9 @@ namespace orc {
 
     std::map<uint32_t, BloomFilterIndex> getBloomFilters(
         uint32_t stripeIndex, const std::set<uint32_t>& included) const 
override;
+
+    std::map<uint32_t, RowGroupIndex> getRowGroupIndex(
+        uint32_t stripeIndex, const std::set<uint32_t>& included) const 
override;
   };
 }  // namespace orc
 
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index 531b56655..775e6d245 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -47,6 +47,7 @@ namespace orc {
     bool useTightNumericVector;
     uint64_t outputBufferCapacity;
     uint64_t memoryBlockSize;
+    bool alignBlockBoundToRowGroup;
 
     WriterOptionsPrivate() : fileVersion(FileVersion::v_0_12()) {  // default 
to Hive_0_12
       stripeSize = 64 * 1024 * 1024;                               // 64M
@@ -69,6 +70,7 @@ namespace orc {
       useTightNumericVector = false;
       outputBufferCapacity = 1024 * 1024;
       memoryBlockSize = 64 * 1024;  // 64K
+      alignBlockBoundToRowGroup = false;
     }
   };
 
@@ -298,6 +300,15 @@ namespace orc {
     return privateBits_->memoryBlockSize;
   }
 
+  WriterOptions& WriterOptions::setAlignBlockBoundToRowGroup(bool 
alignBlockBoundToRowGroup) {
+    privateBits_->alignBlockBoundToRowGroup = alignBlockBoundToRowGroup;
+    return *this;
+  }
+
+  bool WriterOptions::getAlignBlockBoundToRowGroup() const {
+    return privateBits_->alignBlockBoundToRowGroup;
+  }
+
   Writer::~Writer() {
     // PASS
   }
@@ -401,6 +412,9 @@ namespace orc {
         stripeRows_ += chunkSize;
 
         if (indexRows_ >= rowIndexStride) {
+          if (options_.getAlignBlockBoundToRowGroup()) {
+            columnWriter_->finishStreams();
+          }
           columnWriter_->createRowIndexEntry();
           indexRows_ = 0;
         }
diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc
index aa4dbe6ed..fbf1ca61d 100644
--- a/c++/src/io/OutputStream.cc
+++ b/c++/src/io/OutputStream.cc
@@ -128,9 +128,7 @@ namespace orc {
   }
 
   uint64_t AppendOnlyBufferedStream::flush() {
-    outStream_->BackUp(bufferLength_ - bufferOffset_);
-    bufferOffset_ = bufferLength_ = 0;
-    buffer_ = nullptr;
+    finishStream();
     return outStream_->flush();
   }
 
@@ -150,4 +148,11 @@ namespace orc {
     }
   }
 
+  void AppendOnlyBufferedStream::finishStream() {
+    outStream_->BackUp(bufferLength_ - bufferOffset_);
+    outStream_->finishStream();
+    bufferOffset_ = bufferLength_ = 0;
+    buffer_ = nullptr;
+  }
+
 }  // namespace orc
diff --git a/c++/src/io/OutputStream.hh b/c++/src/io/OutputStream.hh
index 4908f34f2..6319de96d 100644
--- a/c++/src/io/OutputStream.hh
+++ b/c++/src/io/OutputStream.hh
@@ -100,6 +100,7 @@ namespace orc {
     void write(const char* data, size_t size);
     uint64_t getSize() const;
     uint64_t flush();
+    void finishStream();
 
     void recordPosition(PositionRecorder* recorder) const;
   };
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index 8bc4032a5..975462e30 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -57,6 +57,8 @@ namespace orc {
     options.setTimezoneName(timezone);
     options.setUseTightNumericVector(useTightNumericVector);
     options.setMemoryBlockSize(memoryBlockSize);
+    // enable align block bound to row group when stride is not 0
+    options.setAlignBlockBoundToRowGroup(true);
     return createWriter(type, stream, options);
   }
 
@@ -84,7 +86,56 @@ namespace orc {
     return reader->createRowReader(rowReaderOpts);
   }
 
-  class WriterTest : public TestWithParam<FileVersion> {
+  void verifyCompressionBlockAlignment(std::unique_ptr<Reader>& reader, 
uint64_t columnCount) {
+    auto stripeCount = reader->getNumberOfStripes();
+    for (uint64_t stripeIndex = 0; stripeIndex < stripeCount; ++stripeIndex) {
+      for (uint64_t i = 0; i < columnCount; ++i) {
+        auto rowGroupIndexMap = reader->getRowGroupIndex(stripeIndex);
+        EXPECT_TRUE(rowGroupIndexMap.size() > 0);
+        auto rowGroupIndex = rowGroupIndexMap[columnCount];
+        auto subType = reader->getType().getSubtype(i);
+        EXPECT_TRUE(rowGroupIndex.positions.size() > 0);
+        for (auto rowGroupPositions : rowGroupIndex.positions) {
+          for (uint64_t posIndex = 0; posIndex < rowGroupPositions.size(); 
++posIndex) {
+            // After we call finishStream(), unusedBufferSize is set to 0,
+            // so only the first position is valid in each recordPosition call.
+            switch (subType->getKind()) {
+              case DECIMAL:
+              case STRING:
+              case BINARY:
+              case CHAR:
+              case VARCHAR: {
+                if (posIndex != 0 && posIndex != 2) {
+                  EXPECT_EQ(rowGroupPositions[posIndex], 0);
+                }
+                break;
+              }
+              case TIMESTAMP_INSTANT:
+              case TIMESTAMP: {
+                if (posIndex != 0 && posIndex != 3) {
+                  EXPECT_EQ(rowGroupPositions[posIndex], 0);
+                }
+                break;
+              }
+              default: {
+                if (posIndex != 0) {
+                  EXPECT_EQ(rowGroupPositions[posIndex], 0);
+                }
+                break;
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  struct TestParams {
+    FileVersion fileVersion;
+    bool enableAlignBlockBoundToRowGroup;
+  };
+
+  class WriterTest : public TestWithParam<TestParams> {
     // You can implement all the usual fixture class members here.
     // To access the test parameter, call GetParam() from class
     // TestWithParam<T>.
@@ -92,13 +143,15 @@ namespace orc {
 
    protected:
     FileVersion fileVersion;
+    bool enableAlignBlockBoundToRowGroup;
 
    public:
-    WriterTest() : fileVersion(FileVersion::v_0_11()) {}
+    WriterTest() : fileVersion(FileVersion::v_0_11()), 
enableAlignBlockBoundToRowGroup(false) {}
   };
 
   void WriterTest::SetUp() {
-    fileVersion = GetParam();
+    fileVersion = GetParam().fileVersion;
+    enableAlignBlockBoundToRowGroup = 
GetParam().enableAlignBlockBoundToRowGroup;
   }
 
   TEST_P(WriterTest, writeEmptyFile) {
@@ -252,7 +305,7 @@ namespace orc {
 
     std::unique_ptr<Writer> writer =
         createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
-                     pool, &memStream, fileVersion);
+                     pool, &memStream, fileVersion, 
enableAlignBlockBoundToRowGroup ? 1024 : 0);
     std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(65535);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     StringVectorBatch* strBatch = 
dynamic_cast<StringVectorBatch*>(structBatch->fields[0]);
@@ -294,6 +347,9 @@ namespace orc {
       EXPECT_EQ(i, static_cast<uint64_t>(atoi(str.c_str())));
       EXPECT_EQ(i, static_cast<uint64_t>(atoi(bin.c_str())));
     }
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
 
     EXPECT_FALSE(rowReader->next(*batch));
   }
@@ -315,7 +371,7 @@ namespace orc {
 
     std::unique_ptr<Writer> writer =
         createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
-                     pool, &memStream, fileVersion);
+                     pool, &memStream, fileVersion, 
enableAlignBlockBoundToRowGroup ? 1024 : 0);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     DoubleVectorBatch* doubleBatch = 
dynamic_cast<DoubleVectorBatch*>(structBatch->fields[0]);
@@ -351,6 +407,10 @@ namespace orc {
                   0.000001f);
     }
     EXPECT_FALSE(rowReader->next(*batch));
+
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
   }
 
   TEST_P(WriterTest, writeShortIntLong) {
@@ -366,7 +426,7 @@ namespace orc {
 
     std::unique_ptr<Writer> writer =
         createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
-                     pool, &memStream, fileVersion);
+                     pool, &memStream, fileVersion, 
enableAlignBlockBoundToRowGroup ? 1024 : 0);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     LongVectorBatch* smallIntBatch = 
dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
@@ -403,6 +463,9 @@ namespace orc {
       EXPECT_EQ(static_cast<int32_t>(i), intBatch->data[i]);
       EXPECT_EQ(static_cast<int64_t>(i), bigIntBatch->data[i]);
     }
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
   }
 
   TEST_P(WriterTest, writeTinyint) {
@@ -415,9 +478,9 @@ namespace orc {
     uint64_t rowCount = 65535;
     uint64_t memoryBlockSize = 64;
 
-    std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZSTD, *type,
-                     pool, &memStream, fileVersion, 1024, "GMT", true);
+    std::unique_ptr<Writer> writer = createWriter(
+        stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZSTD, *type, pool,
+        &memStream, fileVersion, enableAlignBlockBoundToRowGroup ? 1024 : 0, 
"GMT", true);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     ByteVectorBatch* byteBatch = 
dynamic_cast<ByteVectorBatch*>(structBatch->fields[0]);
@@ -442,6 +505,9 @@ namespace orc {
     batch = rowReader->createRowBatch(rowCount);
     rowReader->seekToRow(20);
     EXPECT_EQ(true, rowReader->next(*batch));
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
 
     structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
     auto outByteBatch = dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
@@ -474,7 +540,7 @@ namespace orc {
 
     std::unique_ptr<Writer> writer =
         createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
-                     pool, &memStream, fileVersion);
+                     pool, &memStream, fileVersion, 
enableAlignBlockBoundToRowGroup ? 1024 : 0);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     LongVectorBatch* byteBatch = 
dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
@@ -501,6 +567,9 @@ namespace orc {
     for (uint64_t i = 0; i < rowCount; ++i) {
       EXPECT_EQ((i % 3) == 0 ? 1 : 0, byteBatch->data[i]);
     }
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
   }
 
   TEST_P(WriterTest, writeDate) {
@@ -515,7 +584,7 @@ namespace orc {
 
     std::unique_ptr<Writer> writer =
         createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
-                     pool, &memStream, fileVersion);
+                     pool, &memStream, fileVersion, 
enableAlignBlockBoundToRowGroup ? 1024 : 0);
 
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
@@ -543,6 +612,9 @@ namespace orc {
     for (uint64_t i = 0; i < rowCount; ++i) {
       EXPECT_EQ(static_cast<int32_t>(i), longBatch->data[i]);
     }
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
   }
 
   TEST_P(WriterTest, writeTimestamp) {
@@ -557,7 +629,7 @@ namespace orc {
 
     std::unique_ptr<Writer> writer =
         createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
-                     pool, &memStream, fileVersion);
+                     pool, &memStream, fileVersion, 
enableAlignBlockBoundToRowGroup ? 1024 : 0);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     TimestampVectorBatch* tsBatch = 
dynamic_cast<TimestampVectorBatch*>(structBatch->fields[0]);
@@ -589,14 +661,18 @@ namespace orc {
       EXPECT_EQ(times[i], tsBatch->data[i]);
       EXPECT_EQ(i * 1000, tsBatch->nanoseconds[i]);
     }
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
   }
 
   TEST_P(WriterTest, writeNegativeTimestamp) {
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
     MemoryPool* pool = getDefaultPool();
     std::unique_ptr<Type> 
type(Type::buildTypeFromString("struct<a:timestamp>"));
-    auto writer = createWriter(16 * 1024 * 1024, 64 * 1024, 256 * 1024, 
CompressionKind_ZLIB, *type,
-                               pool, &memStream, fileVersion);
+    auto writer =
+        createWriter(16 * 1024 * 1024, 64 * 1024, 256 * 1024, 
CompressionKind_ZLIB, *type, pool,
+                     &memStream, fileVersion, enableAlignBlockBoundToRowGroup 
? 1024 : 0);
     uint64_t batchCount = 5;
     auto batch = writer->createRowBatch(batchCount * 2);
     auto structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
@@ -646,6 +722,10 @@ namespace orc {
       }
       EXPECT_EQ(1000000, tsBatch->nanoseconds[i]);
     }
+
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
   }
 
 // TODO: Disable the test below for Windows for following reasons:
@@ -766,7 +846,7 @@ namespace orc {
 
     std::unique_ptr<Writer> writer =
         createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
-                     pool, &memStream, fileVersion);
+                     pool, &memStream, fileVersion, 
enableAlignBlockBoundToRowGroup ? 1024 : 0);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     TimestampVectorBatch* tsBatch = 
dynamic_cast<TimestampVectorBatch*>(structBatch->fields[0]);
@@ -798,6 +878,9 @@ namespace orc {
       EXPECT_EQ(times[i], tsBatch->data[i]);
       EXPECT_EQ(i * 1000, tsBatch->nanoseconds[i]);
     }
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
   }
 
   TEST_P(WriterTest, writeCharAndVarcharColumn) {
@@ -815,7 +898,7 @@ namespace orc {
 
     std::unique_ptr<Writer> writer =
         createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
-                     pool, &memStream, fileVersion);
+                     pool, &memStream, fileVersion, 
enableAlignBlockBoundToRowGroup ? 1024 : 0);
 
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
@@ -877,6 +960,9 @@ namespace orc {
     }
 
     EXPECT_FALSE(rowReader->next(*batch));
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
   }
 
   TEST_P(WriterTest, writeDecimal64Column) {
@@ -892,7 +978,7 @@ namespace orc {
 
     std::unique_ptr<Writer> writer =
         createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
-                     pool, &memStream, fileVersion);
+                     pool, &memStream, fileVersion, 
enableAlignBlockBoundToRowGroup ? 1024 : 0);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     Decimal64VectorBatch* decBatch = 
dynamic_cast<Decimal64VectorBatch*>(structBatch->fields[0]);
@@ -954,6 +1040,9 @@ namespace orc {
       EXPECT_EQ(dec, decBatch->values[i]);
       EXPECT_EQ(-dec, decBatch->values[i + maxPrecision]);
     }
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
   }
 
   TEST_P(WriterTest, writeDecimal128Column) {
@@ -969,7 +1058,7 @@ namespace orc {
 
     std::unique_ptr<Writer> writer =
         createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
-                     pool, &memStream, fileVersion);
+                     pool, &memStream, fileVersion, 
enableAlignBlockBoundToRowGroup ? 1024 : 0);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     Decimal128VectorBatch* decBatch = 
dynamic_cast<Decimal128VectorBatch*>(structBatch->fields[0]);
@@ -1041,6 +1130,9 @@ namespace orc {
       EXPECT_EQ(expected, decBatch->values[i].toString());
       EXPECT_EQ("-" + expected, decBatch->values[i + maxPrecision].toString());
     }
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
   }
 
   TEST_P(WriterTest, writeListColumn) {
@@ -1058,7 +1150,7 @@ namespace orc {
 
     std::unique_ptr<Writer> writer =
         createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
-                     pool, &memStream, fileVersion);
+                     pool, &memStream, fileVersion, 
enableAlignBlockBoundToRowGroup ? 1024 : 0);
     std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount 
* maxListLength);
 
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
@@ -1104,6 +1196,9 @@ namespace orc {
         EXPECT_EQ(static_cast<int64_t>(i), data[offsets[i] + j]);
       }
     }
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
   }
 
   TEST_P(WriterTest, writeMapColumn) {
@@ -1118,7 +1213,7 @@ namespace orc {
 
     std::unique_ptr<Writer> writer =
         createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
-                     pool, &memStream, fileVersion);
+                     pool, &memStream, fileVersion, 
enableAlignBlockBoundToRowGroup ? 1024 : 0);
     std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount 
* maxListLength);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     MapVectorBatch* mapBatch = 
dynamic_cast<MapVectorBatch*>(structBatch->fields[0]);
@@ -1185,6 +1280,9 @@ namespace orc {
         EXPECT_EQ(static_cast<int64_t>(i), elemData[offsets[i] + j]);
       }
     }
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
   }
 
   TEST_P(WriterTest, writeUnionColumn) {
@@ -1200,7 +1298,7 @@ namespace orc {
 
     std::unique_ptr<Writer> writer =
         createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
-                     pool, &memStream, fileVersion);
+                     pool, &memStream, fileVersion, 
enableAlignBlockBoundToRowGroup ? 1024 : 0);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     UnionVectorBatch* unionBatch = 
dynamic_cast<UnionVectorBatch*>(structBatch->fields[0]);
@@ -1282,6 +1380,9 @@ namespace orc {
           break;
       }
     }
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
   }
 
   TEST_P(WriterTest, writeUTF8CharAndVarcharColumn) {
@@ -1295,7 +1396,7 @@ namespace orc {
     uint64_t memoryBlockSize = 64;
     std::unique_ptr<Writer> writer =
         createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
-                     pool, &memStream, fileVersion);
+                     pool, &memStream, fileVersion, 
enableAlignBlockBoundToRowGroup ? 1024 : 0);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     StringVectorBatch* charBatch = 
dynamic_cast<StringVectorBatch*>(structBatch->fields[0]);
@@ -1353,6 +1454,9 @@ namespace orc {
     EXPECT_TRUE(memcmp(varcharBatch->data[2], expectedTwoChars, 4) == 0);
 
     EXPECT_FALSE(rowReader->next(*batch));
+    if (enableAlignBlockBoundToRowGroup) {
+      verifyCompressionBlockAlignment(reader, type->getSubtypeCount());
+    }
   }
 
   TEST_P(WriterTest, testWriteListColumnWithNull) {
@@ -2296,7 +2400,12 @@ namespace orc {
     EXPECT_FALSE(rowReader->next(*batch));
   }
 
-  INSTANTIATE_TEST_SUITE_P(OrcTest, WriterTest,
-                           Values(FileVersion::v_0_11(), FileVersion::v_0_12(),
-                                  FileVersion::UNSTABLE_PRE_2_0()));
+  std::vector<TestParams> testParams = {{FileVersion::v_0_11(), true},
+                                        {FileVersion::v_0_11(), false},
+                                        {FileVersion::v_0_12(), false},
+                                        {FileVersion::v_0_12(), true},
+                                        {FileVersion::UNSTABLE_PRE_2_0(), 
false},
+                                        {FileVersion::UNSTABLE_PRE_2_0(), 
true}};
+
+  INSTANTIATE_TEST_SUITE_P(OrcTest, WriterTest, 
::testing::ValuesIn(testParams));
 }  // namespace orc


Reply via email to