This is an automated email from the ASF dual-hosted git repository.

ffacs pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 8003801e7 ORC-1730: [C++] Add finishEncode support for the encoder
8003801e7 is described below

commit 8003801e78ff6156a1f18ec62b631d6ba4768b00
Author: luffy-zh <[email protected]>
AuthorDate: Thu Jun 20 14:10:21 2024 +0800

    ORC-1730: [C++] Add finishEncode support for the encoder
    
    ### What changes were proposed in this pull request?
    Add finishEncode() to the RLE encoder and implement finishStream()  in 
BufferedOutputStream / compressionStream.
    
    ### Why are the changes needed?
    We expect to finish encoding when the compression block is aligned with the 
row group boundary.
    
    ### How was this patch tested?
    Uts in testRleEncode() can cover this patch.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #1956 from luffy-zh/ORC-1264.
    
    Authored-by: luffy-zh <[email protected]>
    Signed-off-by: ffacs <[email protected]>
---
 c++/src/ByteRLE.cc         |  9 ++++++++
 c++/src/ByteRLE.hh         |  7 +++++++
 c++/src/Compression.cc     | 10 +++++++++
 c++/src/RLE.cc             |  6 ++++++
 c++/src/RLE.hh             |  7 +++++++
 c++/src/RLEv1.cc           |  9 +++++---
 c++/src/RLEv1.hh           |  2 ++
 c++/src/RLEv2.hh           |  2 ++
 c++/src/RleEncoderV2.cc    | 51 ++++++++++++++++++++++++----------------------
 c++/src/io/OutputStream.cc |  4 ++++
 c++/src/io/OutputStream.hh |  1 +
 c++/test/TestRleEncoder.cc | 17 +++++++++++++---
 12 files changed, 95 insertions(+), 30 deletions(-)

diff --git a/c++/src/ByteRLE.cc b/c++/src/ByteRLE.cc
index 02a3e4041..ded9f55a0 100644
--- a/c++/src/ByteRLE.cc
+++ b/c++/src/ByteRLE.cc
@@ -63,6 +63,8 @@ namespace orc {
 
     virtual void suppress() override;
 
+    virtual void finishEncode() override;
+
     /**
      * Reset to initial state
      */
@@ -216,6 +218,13 @@ namespace orc {
     reset();
   }
 
+  void ByteRleEncoderImpl::finishEncode() {
+    writeValues();
+    outputStream->BackUp(bufferLength - bufferPosition);
+    outputStream->finishStream();
+    bufferLength = bufferPosition = 0;
+  }
+
   std::unique_ptr<ByteRleEncoder> createByteRleEncoder(
       std::unique_ptr<BufferedOutputStream> output) {
     return std::make_unique<ByteRleEncoderImpl>(std::move(output));
diff --git a/c++/src/ByteRLE.hh b/c++/src/ByteRLE.hh
index bd19f52ec..bee064f66 100644
--- a/c++/src/ByteRLE.hh
+++ b/c++/src/ByteRLE.hh
@@ -59,6 +59,13 @@ namespace orc {
      * suppress the data and reset to initial state
      */
     virtual void suppress() = 0;
+
+    /**
+     * Finalize the encoding process. This function should be called after all 
data required for
+     * encoding has been added. It ensures that any remaining data is 
processed and the final state
+     * of the encoder is set.
+     */
+    virtual void finishEncode() = 0;
   };
 
   class ByteRleDecoder {
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index a315820a8..535018dcb 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -67,6 +67,7 @@ namespace orc {
     }
     virtual uint64_t getSize() const override;
     virtual uint64_t getRawInputBufferSize() const override = 0;
+    virtual void finishStream() override = 0;
 
    protected:
     void writeData(const unsigned char* data, int size);
@@ -173,6 +174,9 @@ namespace orc {
     uint64_t getRawInputBufferSize() const override {
       return rawInputBuffer.size();
     }
+    virtual void finishStream() override {
+      compressInternal();
+    }
 
    protected:
     // return total compressed size
@@ -953,6 +957,8 @@ namespace orc {
       return rawInputBuffer.size();
     }
 
+    virtual void finishStream() override;
+
    protected:
     // compresses a block and returns the compressed size
     virtual uint64_t doBlockCompression() = 0;
@@ -1024,6 +1030,10 @@ namespace orc {
     BufferedOutputStream::suppress();
   }
 
+  void BlockCompressionStream::finishStream() {
+    doBlockCompression();
+  }
+
   /**
    * LZ4 block compression
    */
diff --git a/c++/src/RLE.cc b/c++/src/RLE.cc
index 23168ff7f..cb831c80f 100644
--- a/c++/src/RLE.cc
+++ b/c++/src/RLE.cc
@@ -121,4 +121,10 @@ namespace orc {
     recorder->add(static_cast<uint64_t>(numLiterals));
   }
 
+  void RleEncoder::finishEncode() {
+    outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition));
+    outputStream->finishStream();
+    bufferLength = bufferPosition = 0;
+  }
+
 }  // namespace orc
diff --git a/c++/src/RLE.hh b/c++/src/RLE.hh
index a45b4056b..e46504e88 100644
--- a/c++/src/RLE.hh
+++ b/c++/src/RLE.hh
@@ -84,6 +84,13 @@ namespace orc {
 
     virtual void write(int64_t val) = 0;
 
+    /**
+     * Finalize the encoding process. This function should be called after all 
data required for
+     * encoding has been added. It ensures that any remaining data is 
processed and the final state
+     * of the encoder is set.
+     */
+    virtual void finishEncode();
+
    protected:
     std::unique_ptr<BufferedOutputStream> outputStream;
     size_t bufferPosition;
diff --git a/c++/src/RLEv1.cc b/c++/src/RLEv1.cc
index 5d6f60066..72c555e61 100644
--- a/c++/src/RLEv1.cc
+++ b/c++/src/RLEv1.cc
@@ -74,10 +74,8 @@ namespace orc {
   }
 
   uint64_t RleEncoderV1::flush() {
-    writeValues();
-    outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition));
+    finishEncode();
     uint64_t dataSize = outputStream->flush();
-    bufferLength = bufferPosition = 0;
     return dataSize;
   }
 
@@ -135,6 +133,11 @@ namespace orc {
     }
   }
 
+  void RleEncoderV1::finishEncode() {
+    writeValues();
+    RleEncoder::finishEncode();
+  }
+
   signed char RleDecoderV1::readByte() {
     SCOPED_MINUS_STOPWATCH(metrics, DecodingLatencyUs);
     if (bufferStart_ == bufferEnd_) {
diff --git a/c++/src/RLEv1.hh b/c++/src/RLEv1.hh
index a2a00c930..024b1e5e9 100644
--- a/c++/src/RLEv1.hh
+++ b/c++/src/RLEv1.hh
@@ -38,6 +38,8 @@ namespace orc {
 
     void write(int64_t val) override;
 
+    void finishEncode() override;
+
    private:
     int64_t delta_;
     bool repeat_;
diff --git a/c++/src/RLEv2.hh b/c++/src/RLEv2.hh
index a8e0340e7..8ceb7f125 100644
--- a/c++/src/RLEv2.hh
+++ b/c++/src/RLEv2.hh
@@ -108,6 +108,8 @@ namespace orc {
 
     void write(int64_t val) override;
 
+    void finishEncode() override;
+
    private:
     const bool alignedBitPacking_;
     uint32_t fixedRunLength_;
diff --git a/c++/src/RleEncoderV2.cc b/c++/src/RleEncoderV2.cc
index 18c520025..1cda9ee91 100644
--- a/c++/src/RleEncoderV2.cc
+++ b/c++/src/RleEncoderV2.cc
@@ -440,31 +440,8 @@ namespace orc {
   }
 
   uint64_t RleEncoderV2::flush() {
-    if (numLiterals != 0) {
-      EncodingOption option = {};
-      if (variableRunLength_ != 0) {
-        determineEncoding(option);
-        writeValues(option);
-      } else if (fixedRunLength_ != 0) {
-        if (fixedRunLength_ < MIN_REPEAT) {
-          variableRunLength_ = fixedRunLength_;
-          fixedRunLength_ = 0;
-          determineEncoding(option);
-          writeValues(option);
-        } else if (fixedRunLength_ >= MIN_REPEAT && fixedRunLength_ <= 
MAX_SHORT_REPEAT_LENGTH) {
-          option.encoding = SHORT_REPEAT;
-          writeValues(option);
-        } else {
-          option.encoding = DELTA;
-          option.isFixedDelta = true;
-          writeValues(option);
-        }
-      }
-    }
-
-    outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition));
+    finishEncode();
     uint64_t dataSize = outputStream->flush();
-    bufferLength = bufferPosition = 0;
     return dataSize;
   }
 
@@ -779,4 +756,30 @@ namespace orc {
     fixedRunLength_ = 1;
     variableRunLength_ = 1;
   }
+
+  void RleEncoderV2::finishEncode() {
+    if (numLiterals != 0) {
+      EncodingOption option = {};
+      if (variableRunLength_ != 0) {
+        determineEncoding(option);
+        writeValues(option);
+      } else if (fixedRunLength_ != 0) {
+        if (fixedRunLength_ < MIN_REPEAT) {
+          variableRunLength_ = fixedRunLength_;
+          fixedRunLength_ = 0;
+          determineEncoding(option);
+          writeValues(option);
+        } else if (fixedRunLength_ >= MIN_REPEAT && fixedRunLength_ <= 
MAX_SHORT_REPEAT_LENGTH) {
+          option.encoding = SHORT_REPEAT;
+          writeValues(option);
+        } else {
+          option.encoding = DELTA;
+          option.isFixedDelta = true;
+          writeValues(option);
+        }
+      }
+    }
+
+    RleEncoder::finishEncode();
+  }
 }  // namespace orc
diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc
index 26b5f7e5d..aa4dbe6ed 100644
--- a/c++/src/io/OutputStream.cc
+++ b/c++/src/io/OutputStream.cc
@@ -61,6 +61,10 @@ namespace orc {
     }
   }
 
+  void BufferedOutputStream::finishStream() {
+    // PASS
+  }
+
   google::protobuf::int64 BufferedOutputStream::ByteCount() const {
     return static_cast<google::protobuf::int64>(dataBuffer_->size());
   }
diff --git a/c++/src/io/OutputStream.hh b/c++/src/io/OutputStream.hh
index a869632e6..4908f34f2 100644
--- a/c++/src/io/OutputStream.hh
+++ b/c++/src/io/OutputStream.hh
@@ -74,6 +74,7 @@ namespace orc {
     virtual bool isCompressed() const {
       return false;
     }
+    virtual void finishStream();
   };
   DIAGNOSTIC_POP
 
diff --git a/c++/test/TestRleEncoder.cc b/c++/test/TestRleEncoder.cc
index 1c24a6951..d458236cb 100644
--- a/c++/test/TestRleEncoder.cc
+++ b/c++/test/TestRleEncoder.cc
@@ -84,8 +84,8 @@ namespace orc {
         std::make_unique<SeekableArrayInputStream>(memStream.getData(), 
memStream.getLength()),
         isSinged, version, *getDefaultPool(), getDefaultReaderMetrics());
 
-    int64_t* decodedData = new int64_t[numValues];
-    decoder->next(decodedData, numValues, notNull);
+    std::vector<int64_t> decodedData(numValues);
+    decoder->next(decodedData.data(), numValues, notNull);
 
     for (uint64_t i = 0; i < numValues; ++i) {
       if (!notNull || notNull[i]) {
@@ -93,7 +93,12 @@ namespace orc {
       }
     }
 
-    delete[] decodedData;
+    decoder->next(decodedData.data(), numValues, notNull);
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (!notNull || notNull[i]) {
+        EXPECT_EQ(data[i], decodedData[i]);
+      }
+    }
   }
 
   std::unique_ptr<RleEncoder> RleTest::getEncoder(RleVersion version, 
MemoryOutputStream& memStream,
@@ -128,6 +133,9 @@ namespace orc {
     char* notNull = numNulls == 0 ? nullptr : new char[numValues];
     int64_t* data = new int64_t[numValues];
     generateData(numValues, start, delta, random, data, numNulls, notNull);
+    encoder->add(data, numValues, notNull);
+    encoder->finishEncode();
+
     encoder->add(data, numValues, notNull);
     encoder->flush();
 
@@ -243,6 +251,9 @@ namespace orc {
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
 
     std::unique_ptr<RleEncoder> encoder = getEncoder(RleVersion_2, memStream, 
isSigned);
+    encoder->add(data, numValues, nullptr);
+    encoder->finishEncode();
+
     encoder->add(data, numValues, nullptr);
     encoder->flush();
 

Reply via email to