This is an automated email from the ASF dual-hosted git repository.
ffacs pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 8003801e7 ORC-1730: [C++] Add finishEncode support for the encoder
8003801e7 is described below
commit 8003801e78ff6156a1f18ec62b631d6ba4768b00
Author: luffy-zh <[email protected]>
AuthorDate: Thu Jun 20 14:10:21 2024 +0800
ORC-1730: [C++] Add finishEncode support for the encoder
### What changes were proposed in this pull request?
Add finishEncode() to the RLE encoder and implement finishStream() in
BufferedOutputStream / compressionStream.
### Why are the changes needed?
We expect to finish encoding when the compression block is aligned with the
row group boundary.
### How was this patch tested?
Uts in testRleEncode() can cover this patch.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #1956 from luffy-zh/ORC-1264.
Authored-by: luffy-zh <[email protected]>
Signed-off-by: ffacs <[email protected]>
---
c++/src/ByteRLE.cc | 9 ++++++++
c++/src/ByteRLE.hh | 7 +++++++
c++/src/Compression.cc | 10 +++++++++
c++/src/RLE.cc | 6 ++++++
c++/src/RLE.hh | 7 +++++++
c++/src/RLEv1.cc | 9 +++++---
c++/src/RLEv1.hh | 2 ++
c++/src/RLEv2.hh | 2 ++
c++/src/RleEncoderV2.cc | 51 ++++++++++++++++++++++++----------------------
c++/src/io/OutputStream.cc | 4 ++++
c++/src/io/OutputStream.hh | 1 +
c++/test/TestRleEncoder.cc | 17 +++++++++++++---
12 files changed, 95 insertions(+), 30 deletions(-)
diff --git a/c++/src/ByteRLE.cc b/c++/src/ByteRLE.cc
index 02a3e4041..ded9f55a0 100644
--- a/c++/src/ByteRLE.cc
+++ b/c++/src/ByteRLE.cc
@@ -63,6 +63,8 @@ namespace orc {
virtual void suppress() override;
+ virtual void finishEncode() override;
+
/**
* Reset to initial state
*/
@@ -216,6 +218,13 @@ namespace orc {
reset();
}
+ void ByteRleEncoderImpl::finishEncode() {
+ writeValues();
+ outputStream->BackUp(bufferLength - bufferPosition);
+ outputStream->finishStream();
+ bufferLength = bufferPosition = 0;
+ }
+
std::unique_ptr<ByteRleEncoder> createByteRleEncoder(
std::unique_ptr<BufferedOutputStream> output) {
return std::make_unique<ByteRleEncoderImpl>(std::move(output));
diff --git a/c++/src/ByteRLE.hh b/c++/src/ByteRLE.hh
index bd19f52ec..bee064f66 100644
--- a/c++/src/ByteRLE.hh
+++ b/c++/src/ByteRLE.hh
@@ -59,6 +59,13 @@ namespace orc {
* suppress the data and reset to initial state
*/
virtual void suppress() = 0;
+
+ /**
+ * Finalize the encoding process. This function should be called after all
data required for
+ * encoding has been added. It ensures that any remaining data is
processed and the final state
+ * of the encoder is set.
+ */
+ virtual void finishEncode() = 0;
};
class ByteRleDecoder {
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index a315820a8..535018dcb 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -67,6 +67,7 @@ namespace orc {
}
virtual uint64_t getSize() const override;
virtual uint64_t getRawInputBufferSize() const override = 0;
+ virtual void finishStream() override = 0;
protected:
void writeData(const unsigned char* data, int size);
@@ -173,6 +174,9 @@ namespace orc {
uint64_t getRawInputBufferSize() const override {
return rawInputBuffer.size();
}
+ virtual void finishStream() override {
+ compressInternal();
+ }
protected:
// return total compressed size
@@ -953,6 +957,8 @@ namespace orc {
return rawInputBuffer.size();
}
+ virtual void finishStream() override;
+
protected:
// compresses a block and returns the compressed size
virtual uint64_t doBlockCompression() = 0;
@@ -1024,6 +1030,10 @@ namespace orc {
BufferedOutputStream::suppress();
}
+ void BlockCompressionStream::finishStream() {
+ doBlockCompression();
+ }
+
/**
* LZ4 block compression
*/
diff --git a/c++/src/RLE.cc b/c++/src/RLE.cc
index 23168ff7f..cb831c80f 100644
--- a/c++/src/RLE.cc
+++ b/c++/src/RLE.cc
@@ -121,4 +121,10 @@ namespace orc {
recorder->add(static_cast<uint64_t>(numLiterals));
}
+ void RleEncoder::finishEncode() {
+ outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition));
+ outputStream->finishStream();
+ bufferLength = bufferPosition = 0;
+ }
+
} // namespace orc
diff --git a/c++/src/RLE.hh b/c++/src/RLE.hh
index a45b4056b..e46504e88 100644
--- a/c++/src/RLE.hh
+++ b/c++/src/RLE.hh
@@ -84,6 +84,13 @@ namespace orc {
virtual void write(int64_t val) = 0;
+ /**
+ * Finalize the encoding process. This function should be called after all
data required for
+ * encoding has been added. It ensures that any remaining data is
processed and the final state
+ * of the encoder is set.
+ */
+ virtual void finishEncode();
+
protected:
std::unique_ptr<BufferedOutputStream> outputStream;
size_t bufferPosition;
diff --git a/c++/src/RLEv1.cc b/c++/src/RLEv1.cc
index 5d6f60066..72c555e61 100644
--- a/c++/src/RLEv1.cc
+++ b/c++/src/RLEv1.cc
@@ -74,10 +74,8 @@ namespace orc {
}
uint64_t RleEncoderV1::flush() {
- writeValues();
- outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition));
+ finishEncode();
uint64_t dataSize = outputStream->flush();
- bufferLength = bufferPosition = 0;
return dataSize;
}
@@ -135,6 +133,11 @@ namespace orc {
}
}
+ void RleEncoderV1::finishEncode() {
+ writeValues();
+ RleEncoder::finishEncode();
+ }
+
signed char RleDecoderV1::readByte() {
SCOPED_MINUS_STOPWATCH(metrics, DecodingLatencyUs);
if (bufferStart_ == bufferEnd_) {
diff --git a/c++/src/RLEv1.hh b/c++/src/RLEv1.hh
index a2a00c930..024b1e5e9 100644
--- a/c++/src/RLEv1.hh
+++ b/c++/src/RLEv1.hh
@@ -38,6 +38,8 @@ namespace orc {
void write(int64_t val) override;
+ void finishEncode() override;
+
private:
int64_t delta_;
bool repeat_;
diff --git a/c++/src/RLEv2.hh b/c++/src/RLEv2.hh
index a8e0340e7..8ceb7f125 100644
--- a/c++/src/RLEv2.hh
+++ b/c++/src/RLEv2.hh
@@ -108,6 +108,8 @@ namespace orc {
void write(int64_t val) override;
+ void finishEncode() override;
+
private:
const bool alignedBitPacking_;
uint32_t fixedRunLength_;
diff --git a/c++/src/RleEncoderV2.cc b/c++/src/RleEncoderV2.cc
index 18c520025..1cda9ee91 100644
--- a/c++/src/RleEncoderV2.cc
+++ b/c++/src/RleEncoderV2.cc
@@ -440,31 +440,8 @@ namespace orc {
}
uint64_t RleEncoderV2::flush() {
- if (numLiterals != 0) {
- EncodingOption option = {};
- if (variableRunLength_ != 0) {
- determineEncoding(option);
- writeValues(option);
- } else if (fixedRunLength_ != 0) {
- if (fixedRunLength_ < MIN_REPEAT) {
- variableRunLength_ = fixedRunLength_;
- fixedRunLength_ = 0;
- determineEncoding(option);
- writeValues(option);
- } else if (fixedRunLength_ >= MIN_REPEAT && fixedRunLength_ <=
MAX_SHORT_REPEAT_LENGTH) {
- option.encoding = SHORT_REPEAT;
- writeValues(option);
- } else {
- option.encoding = DELTA;
- option.isFixedDelta = true;
- writeValues(option);
- }
- }
- }
-
- outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition));
+ finishEncode();
uint64_t dataSize = outputStream->flush();
- bufferLength = bufferPosition = 0;
return dataSize;
}
@@ -779,4 +756,30 @@ namespace orc {
fixedRunLength_ = 1;
variableRunLength_ = 1;
}
+
+ void RleEncoderV2::finishEncode() {
+ if (numLiterals != 0) {
+ EncodingOption option = {};
+ if (variableRunLength_ != 0) {
+ determineEncoding(option);
+ writeValues(option);
+ } else if (fixedRunLength_ != 0) {
+ if (fixedRunLength_ < MIN_REPEAT) {
+ variableRunLength_ = fixedRunLength_;
+ fixedRunLength_ = 0;
+ determineEncoding(option);
+ writeValues(option);
+ } else if (fixedRunLength_ >= MIN_REPEAT && fixedRunLength_ <=
MAX_SHORT_REPEAT_LENGTH) {
+ option.encoding = SHORT_REPEAT;
+ writeValues(option);
+ } else {
+ option.encoding = DELTA;
+ option.isFixedDelta = true;
+ writeValues(option);
+ }
+ }
+ }
+
+ RleEncoder::finishEncode();
+ }
} // namespace orc
diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc
index 26b5f7e5d..aa4dbe6ed 100644
--- a/c++/src/io/OutputStream.cc
+++ b/c++/src/io/OutputStream.cc
@@ -61,6 +61,10 @@ namespace orc {
}
}
+ void BufferedOutputStream::finishStream() {
+ // PASS
+ }
+
google::protobuf::int64 BufferedOutputStream::ByteCount() const {
return static_cast<google::protobuf::int64>(dataBuffer_->size());
}
diff --git a/c++/src/io/OutputStream.hh b/c++/src/io/OutputStream.hh
index a869632e6..4908f34f2 100644
--- a/c++/src/io/OutputStream.hh
+++ b/c++/src/io/OutputStream.hh
@@ -74,6 +74,7 @@ namespace orc {
virtual bool isCompressed() const {
return false;
}
+ virtual void finishStream();
};
DIAGNOSTIC_POP
diff --git a/c++/test/TestRleEncoder.cc b/c++/test/TestRleEncoder.cc
index 1c24a6951..d458236cb 100644
--- a/c++/test/TestRleEncoder.cc
+++ b/c++/test/TestRleEncoder.cc
@@ -84,8 +84,8 @@ namespace orc {
std::make_unique<SeekableArrayInputStream>(memStream.getData(),
memStream.getLength()),
isSinged, version, *getDefaultPool(), getDefaultReaderMetrics());
- int64_t* decodedData = new int64_t[numValues];
- decoder->next(decodedData, numValues, notNull);
+ std::vector<int64_t> decodedData(numValues);
+ decoder->next(decodedData.data(), numValues, notNull);
for (uint64_t i = 0; i < numValues; ++i) {
if (!notNull || notNull[i]) {
@@ -93,7 +93,12 @@ namespace orc {
}
}
- delete[] decodedData;
+ decoder->next(decodedData.data(), numValues, notNull);
+ for (uint64_t i = 0; i < numValues; ++i) {
+ if (!notNull || notNull[i]) {
+ EXPECT_EQ(data[i], decodedData[i]);
+ }
+ }
}
std::unique_ptr<RleEncoder> RleTest::getEncoder(RleVersion version,
MemoryOutputStream& memStream,
@@ -128,6 +133,9 @@ namespace orc {
char* notNull = numNulls == 0 ? nullptr : new char[numValues];
int64_t* data = new int64_t[numValues];
generateData(numValues, start, delta, random, data, numNulls, notNull);
+ encoder->add(data, numValues, notNull);
+ encoder->finishEncode();
+
encoder->add(data, numValues, notNull);
encoder->flush();
@@ -243,6 +251,9 @@ namespace orc {
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
std::unique_ptr<RleEncoder> encoder = getEncoder(RleVersion_2, memStream,
isSigned);
+ encoder->add(data, numValues, nullptr);
+ encoder->finishEncode();
+
encoder->add(data, numValues, nullptr);
encoder->flush();