This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 634d9b014 ORC-1365: [C++] Use BlockBuffer to replace dataBuffer of
rawInputBuffer
634d9b014 is described below
commit 634d9b0147f7b0a59bf7afa82fbee5959cada9b9
Author: luffy-zh <[email protected]>
AuthorDate: Wed May 8 12:35:28 2024 +0800
ORC-1365: [C++] Use BlockBuffer to replace dataBuffer of rawInputBuffer
### What changes were proposed in this pull request?
The purpose of this PR is to replace the DataBuffer with BlockBuffer. This
way the orc compressor can start with a small initial size and grow
automatically as needed.
### Why are the changes needed?
This patch uses BlockBuffer to replace the DataBuffer of class
CompressionStream in order to solve the
[issue](https://issues.apache.org/jira/browse/ORC-1365).
### How was this patch tested?
The UTs in TestBufferedOutputStream.cc and TestCompression.cc can cover
this patch.
Closes #1916 from luffy-zh/ORC-1365.
Authored-by: luffy-zh <[email protected]>
Signed-off-by: Gang Wu <[email protected]>
---
c++/src/BlockBuffer.hh | 2 +
c++/src/Compression.cc | 171 +++++++++++++++++++++++++++++++------------------
2 files changed, 112 insertions(+), 61 deletions(-)
diff --git a/c++/src/BlockBuffer.hh b/c++/src/BlockBuffer.hh
index 2faf38f7f..6d265b0e3 100644
--- a/c++/src/BlockBuffer.hh
+++ b/c++/src/BlockBuffer.hh
@@ -106,12 +106,14 @@ namespace orc {
}
void resize(uint64_t size);
+
/**
* Requests the BlockBuffer to contain at least newCapacity bytes.
* Reallocation happens if there is need of more space.
* @param newCapacity new capacity of BlockBuffer
*/
void reserve(uint64_t newCapacity);
+
/**
* Write the BlockBuffer content into OutputStream
* @param output the output stream to write to
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index 4002276e1..a1595da49 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -55,11 +55,11 @@ namespace orc {
uint64_t blockSize, MemoryPool& pool, WriterMetrics*
metrics);
virtual bool Next(void** data, int* size) override = 0;
- virtual void BackUp(int count) override;
+ virtual void BackUp(int count) override = 0;
virtual std::string getName() const override = 0;
- virtual uint64_t flush() override;
- virtual void suppress() override;
+ virtual uint64_t flush() override = 0;
+ virtual void suppress() override = 0;
virtual bool isCompressed() const override {
return true;
@@ -78,9 +78,6 @@ namespace orc {
// ensure enough room for compression block header
void ensureHeader();
- // Buffer to hold uncompressed data until user calls Next()
- DataBuffer<unsigned char> rawInputBuffer;
-
// Compress level
int level;
@@ -105,7 +102,6 @@ namespace orc {
uint64_t capacity, uint64_t
blockSize,
MemoryPool& pool,
WriterMetrics* metrics)
: BufferedOutputStream(pool, outStream, capacity, blockSize, metrics),
- rawInputBuffer(pool, blockSize),
level(compressionLevel),
outputBuffer(nullptr),
bufferSize(0),
@@ -115,30 +111,6 @@ namespace orc {
header.fill(nullptr);
}
- void CompressionStreamBase::BackUp(int count) {
- if (count > bufferSize) {
- throw std::logic_error("Can't backup that much!");
- }
- bufferSize -= count;
- }
-
- uint64_t CompressionStreamBase::flush() {
- void* data;
- int size;
- if (!Next(&data, &size)) {
- throw std::runtime_error("Failed to flush compression buffer.");
- }
- BufferedOutputStream::BackUp(outputSize - outputPosition);
- bufferSize = outputSize = outputPosition = 0;
- return BufferedOutputStream::flush();
- }
-
- void CompressionStreamBase::suppress() {
- outputBuffer = nullptr;
- bufferSize = outputPosition = outputSize = 0;
- BufferedOutputStream::suppress();
- }
-
uint64_t CompressionStreamBase::getSize() const {
return BufferedOutputStream::getSize() - static_cast<uint64_t>(outputSize
- outputPosition);
}
@@ -187,27 +159,62 @@ namespace orc {
virtual bool Next(void** data, int* size) override;
virtual std::string getName() const override = 0;
+ virtual void BackUp(int count) override;
+ virtual void suppress() override;
+ virtual uint64_t flush() override;
protected:
// return total compressed size
virtual uint64_t doStreamingCompression() = 0;
+
+ // Buffer to hold uncompressed data until user calls Next()
+ BlockBuffer rawInputBuffer;
};
+ void CompressionStream::BackUp(int count) {
+ uint64_t backup = static_cast<uint64_t>(count);
+ uint64_t currSize = rawInputBuffer.size();
+ if (backup > currSize) {
+ throw std::logic_error("Can't backup that much!");
+ }
+ rawInputBuffer.resize(currSize - backup);
+ }
+
+ uint64_t CompressionStream::flush() {
+ void* data;
+ int size;
+ if (!Next(&data, &size)) {
+ throw std::runtime_error("Failed to flush compression buffer.");
+ }
+ BufferedOutputStream::BackUp(outputSize - outputPosition);
+ rawInputBuffer.resize(0);
+ outputSize = outputPosition = 0;
+ return BufferedOutputStream::flush();
+ }
+
+ void CompressionStream::suppress() {
+ outputBuffer = nullptr;
+ outputPosition = outputSize = 0;
+ rawInputBuffer.resize(0);
+ BufferedOutputStream::suppress();
+ }
+
CompressionStream::CompressionStream(OutputStream* outStream, int
compressionLevel,
uint64_t capacity, uint64_t blockSize,
MemoryPool& pool,
WriterMetrics* metrics)
- : CompressionStreamBase(outStream, compressionLevel, capacity,
blockSize, pool, metrics) {
+ : CompressionStreamBase(outStream, compressionLevel, capacity,
blockSize, pool, metrics),
+ rawInputBuffer(pool, blockSize) {
// PASS
}
bool CompressionStream::Next(void** data, int* size) {
- if (bufferSize != 0) {
+ if (rawInputBuffer.size() != 0) {
ensureHeader();
uint64_t preSize = getSize();
uint64_t totalCompressedSize = doStreamingCompression();
- if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) {
- writeHeader(static_cast<size_t>(bufferSize), true);
+ if (totalCompressedSize >= static_cast<unsigned
long>(rawInputBuffer.size())) {
+ writeHeader(static_cast<size_t>(rawInputBuffer.size()), true);
// reset output buffer
outputBuffer = nullptr;
outputPosition = outputSize = 0;
@@ -215,16 +222,20 @@ namespace orc {
BufferedOutputStream::BackUp(static_cast<int>(backup));
// copy raw input buffer into block buffer
- writeData(rawInputBuffer.data(), bufferSize);
+ uint64_t blockNumber = rawInputBuffer.getBlockNumber();
+ for (uint64_t i = 0; i < blockNumber; ++i) {
+ auto block = rawInputBuffer.getBlock(i);
+ writeData(reinterpret_cast<const unsigned char*>(block.data),
block.size);
+ }
} else {
writeHeader(totalCompressedSize, false);
}
+ rawInputBuffer.resize(0);
}
- *data = rawInputBuffer.data();
- *size = static_cast<int>(rawInputBuffer.size());
- bufferSize = *size;
-
+ auto block = rawInputBuffer.getNextBlock();
+ *data = block.data;
+ *size = static_cast<int>(block.size);
return true;
}
@@ -260,31 +271,43 @@ namespace orc {
throw std::runtime_error("Failed to reset inflate.");
}
- strm_.avail_in = static_cast<unsigned int>(bufferSize);
- strm_.next_in = rawInputBuffer.data();
+ // iterate through all blocks
+ uint64_t blockId = 0;
+ bool finish = false;
do {
- if (outputPosition >= outputSize) {
- if
(!BufferedOutputStream::Next(reinterpret_cast<void**>(&outputBuffer),
&outputSize)) {
- throw std::runtime_error("Failed to get next output buffer from
output stream.");
- }
- outputPosition = 0;
+ if (blockId == rawInputBuffer.getBlockNumber()) {
+ finish = true;
+ strm_.avail_in = 0;
+ strm_.next_in = nullptr;
+ } else {
+ auto block = rawInputBuffer.getBlock(blockId++);
+ strm_.avail_in = static_cast<unsigned int>(block.size);
+ strm_.next_in = reinterpret_cast<unsigned char*>(block.data);
}
- strm_.next_out = reinterpret_cast<unsigned char*>(outputBuffer +
outputPosition);
- strm_.avail_out = static_cast<unsigned int>(outputSize - outputPosition);
- int ret = deflate(&strm_, Z_FINISH);
- outputPosition = outputSize - static_cast<int>(strm_.avail_out);
+ do {
+ if (outputPosition >= outputSize) {
+ if
(!BufferedOutputStream::Next(reinterpret_cast<void**>(&outputBuffer),
&outputSize)) {
+ throw std::runtime_error("Failed to get next output buffer from
output stream.");
+ }
+ outputPosition = 0;
+ }
+ strm_.next_out = reinterpret_cast<unsigned char*>(outputBuffer +
outputPosition);
+ strm_.avail_out = static_cast<unsigned int>(outputSize -
outputPosition);
- if (ret == Z_STREAM_END) {
- break;
- } else if (ret == Z_OK) {
- // needs more buffer so will continue the loop
- } else {
- throw std::runtime_error("Failed to deflate input data.");
- }
- } while (strm_.avail_out == 0);
+ int ret = deflate(&strm_, finish ? Z_FINISH : Z_NO_FLUSH);
+ outputPosition = outputSize - static_cast<int>(strm_.avail_out);
+ if (ret == Z_STREAM_END) {
+ break;
+ } else if (ret == Z_OK) {
+ // needs more buffer so will continue the loop
+ } else {
+ throw std::runtime_error("Failed to deflate input data.");
+ }
+ } while (strm_.avail_out == 0);
+ } while (!finish);
return strm_.total_out;
}
@@ -882,12 +905,15 @@ namespace orc {
BlockCompressionStream(OutputStream* outStream, int compressionLevel,
uint64_t capacity,
uint64_t blockSize, MemoryPool& pool,
WriterMetrics* metrics)
: CompressionStreamBase(outStream, compressionLevel, capacity,
blockSize, pool, metrics),
- compressorBuffer(pool) {
+ compressorBuffer(pool),
+ rawInputBuffer(pool, blockSize) {
// PASS
}
virtual bool Next(void** data, int* size) override;
virtual void suppress() override;
+ virtual void BackUp(int count) override;
+ virtual uint64_t flush() override;
virtual std::string getName() const override = 0;
protected:
@@ -900,8 +926,29 @@ namespace orc {
// should allocate max possible compressed size
DataBuffer<unsigned char> compressorBuffer;
+
+ // Buffer to hold uncompressed data until user calls Next()
+ DataBuffer<unsigned char> rawInputBuffer;
};
+ void BlockCompressionStream::BackUp(int count) {
+ if (count > bufferSize) {
+ throw std::logic_error("Can't backup that much!");
+ }
+ bufferSize -= count;
+ }
+
+ uint64_t BlockCompressionStream::flush() {
+ void* data;
+ int size;
+ if (!Next(&data, &size)) {
+ throw std::runtime_error("Failed to flush compression buffer.");
+ }
+ BufferedOutputStream::BackUp(outputSize - outputPosition);
+ bufferSize = outputSize = outputPosition = 0;
+ return BufferedOutputStream::flush();
+ }
+
bool BlockCompressionStream::Next(void** data, int* size) {
if (bufferSize != 0) {
ensureHeader();
@@ -935,7 +982,9 @@ namespace orc {
void BlockCompressionStream::suppress() {
compressorBuffer.resize(0);
- CompressionStreamBase::suppress();
+ outputBuffer = nullptr;
+ bufferSize = outputPosition = outputSize = 0;
+ BufferedOutputStream::suppress();
}
/**