This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 4da1acb3f ORC-1286: [C++] replace DataBuffer with BlockBuffer in class
BufferedOutputStream
4da1acb3f is described below
commit 4da1acb3fe9529db69687c047a2e3df5e52f6b32
Author: coderex2522 <[email protected]>
AuthorDate: Wed Oct 26 09:00:55 2022 -0700
ORC-1286: [C++] replace DataBuffer with BlockBuffer in class
BufferedOutputStream
### What changes were proposed in this pull request?
This PR can solve the huge memory taken by BufferedOutputStream and
refactor the write data logic in class CompressionBase.
### Why are the changes needed?
This patch use BlockBuffer to replace DataBuffer of class
BufferedOutputStream in order to solve the
[issue](https://github.com/apache/orc/issues/1240).
### How was this patch tested?
The UTs in TestBufferedOutputStream.cc and TestCompression.cc can cover
this patch. Add the TestBlockBuffer.write_to UT.
Closes #1275 from coderex2522/ORC-1280-PART2.
Authored-by: coderex2522 <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
c++/src/BlockBuffer.cc | 53 +++++++++++++++++++
c++/src/BlockBuffer.hh | 9 ++++
c++/src/Compression.cc | 114 ++++++++++++++++++++++-------------------
c++/src/io/OutputStream.cc | 22 ++++----
c++/src/io/OutputStream.hh | 3 +-
c++/test/MemoryOutputStream.hh | 1 +
c++/test/TestBlockBuffer.cc | 40 +++++++++++++++
7 files changed, 177 insertions(+), 65 deletions(-)
diff --git a/c++/src/BlockBuffer.cc b/c++/src/BlockBuffer.cc
index aac0d4798..defd86ae9 100644
--- a/c++/src/BlockBuffer.cc
+++ b/c++/src/BlockBuffer.cc
@@ -17,6 +17,8 @@
*/
#include "BlockBuffer.hh"
+#include "orc/Writer.hh"
+#include "orc/OrcFile.hh"
#include <algorithm>
@@ -82,4 +84,55 @@ namespace orc {
}
}
}
+
+ void BlockBuffer::writeTo(OutputStream* output,
+ WriterMetrics* metrics) {
+ if (currentSize == 0) {
+ return;
+ }
+ static uint64_t MAX_CHUNK_SIZE = 1024 * 1024 * 1024;
+ uint64_t chunkSize = std::min(output->getNaturalWriteSize(),
MAX_CHUNK_SIZE);
+ if (chunkSize == 0) {
+ throw std::logic_error("Natural write size cannot be zero");
+ }
+ uint64_t ioCount = 0;
+ uint64_t blockNumber = getBlockNumber();
+ // if only exists one block, currentSize is equal to first block size
+ if (blockNumber == 1 && currentSize <= chunkSize) {
+ Block block = getBlock(0);
+ output->write(block.data, block.size);
+ ++ioCount;
+ } else {
+ char* chunk = memoryPool.malloc(chunkSize);
+ uint64_t chunkOffset = 0;
+ for (uint64_t i = 0; i < blockNumber; ++i) {
+ Block block = getBlock(i);
+ uint64_t blockOffset = 0;
+ while (blockOffset < block.size) {
+ // copy current block into chunk
+ uint64_t copySize =
+ std::min(chunkSize - chunkOffset, block.size - blockOffset);
+ memcpy(chunk + chunkOffset, block.data + blockOffset, copySize);
+ chunkOffset += copySize;
+ blockOffset += copySize;
+
+ // chunk is full
+ if (chunkOffset >= chunkSize) {
+ output->write(chunk, chunkSize);
+ chunkOffset = 0;
+ ++ioCount;
+ }
+ }
+ }
+ if (chunkOffset != 0) {
+ output->write(chunk, chunkOffset);
+ ++ioCount;
+ }
+ memoryPool.free(chunk);
+ }
+
+ if (metrics != nullptr) {
+ metrics->IOCount.fetch_add(ioCount);
+ }
+ }
} // namespace orc
diff --git a/c++/src/BlockBuffer.hh b/c++/src/BlockBuffer.hh
index bb22b8a02..2869cce9b 100644
--- a/c++/src/BlockBuffer.hh
+++ b/c++/src/BlockBuffer.hh
@@ -25,6 +25,8 @@
namespace orc {
+ class OutputStream;
+ struct WriterMetrics;
/**
* BlockBuffer implements a memory allocation policy based on
* equal-length blocks. BlockBuffer will reserve multiple blocks
@@ -110,6 +112,13 @@ namespace orc {
* @param newCapacity new capacity of BlockBuffer
*/
void reserve(uint64_t newCapacity);
+ /**
+ * Write the BlockBuffer content into OutputStream
+ * @param output the output stream to write to
+ * @param metrics the metrics of the writer
+ */
+ void writeTo(OutputStream* output,
+ WriterMetrics* metrics);
};
} // namespace orc
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index aa81deacf..5e256c5cd 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -24,6 +24,7 @@
#include "lz4.h"
#include <algorithm>
+#include <array>
#include <iomanip>
#include <iostream>
#include <sstream>
@@ -68,10 +69,12 @@ namespace orc {
virtual uint64_t getSize() const override;
protected:
- void writeHeader(char * buffer, size_t compressedSize, bool original) {
- buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 :
0));
- buffer[1] = static_cast<char>(compressedSize >> 7);
- buffer[2] = static_cast<char>(compressedSize >> 15);
+ void writeData(const unsigned char* data, int size);
+
+ void writeHeader(size_t compressedSize, bool original) {
+ *header[0] = static_cast<char>((compressedSize << 1) + (original ? 1 :
0));
+ *header[1] = static_cast<char>(compressedSize >> 7);
+ *header[2] = static_cast<char>(compressedSize >> 15);
}
// ensure enough room for compression block header
@@ -94,6 +97,10 @@ namespace orc {
// Compress output buffer size
int outputSize;
+
+ // Compression block header pointer array
+ static const uint32_t HEADER_SIZE = 3;
+ std::array<char*, HEADER_SIZE> header;
};
CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
@@ -113,7 +120,8 @@ namespace orc {
bufferSize(0),
outputPosition(0),
outputSize(0) {
- // PASS
+ // init header pointer array
+ header.fill(nullptr);
}
void CompressionStreamBase::BackUp(int count) {
@@ -145,19 +153,46 @@ namespace orc {
static_cast<uint64_t>(outputSize - outputPosition);
}
+ // write the data content into outputBuffer
+ void CompressionStreamBase::writeData(const unsigned char* data, int size) {
+ int offset = 0;
+ while (offset < size) {
+ if (outputPosition == outputSize) {
+ if (!BufferedOutputStream::Next(
+ reinterpret_cast<void **>(&outputBuffer),
+ &outputSize)) {
+ throw std::runtime_error(
+ "Failed to get next output buffer from output stream.");
+ }
+ outputPosition = 0;
+ } else if (outputPosition > outputSize) {
+ // for safety this will unlikely happen
+ throw std::logic_error(
+ "Write to an out-of-bound place during compression!");
+ }
+ int currentSize = std::min(outputSize - outputPosition, size - offset);
+ memcpy(outputBuffer + outputPosition,
+ data + offset,
+ static_cast<size_t>(currentSize));
+ offset += currentSize;
+ outputPosition += currentSize;
+ }
+ }
+
void CompressionStreamBase::ensureHeader() {
// adjust 3 bytes for the compression header
- if (outputPosition + 3 >= outputSize) {
- int newPosition = outputPosition + 3 - outputSize;
- if (!BufferedOutputStream::Next(
- reinterpret_cast<void **>(&outputBuffer),
- &outputSize)) {
+ for (uint32_t i = 0; i < HEADER_SIZE; ++i) {
+ if (outputPosition >= outputSize) {
+ if (!BufferedOutputStream::Next(
+ reinterpret_cast<void **>(&outputBuffer),
+ &outputSize)) {
throw std::runtime_error(
"Failed to get next output buffer from output stream.");
+ }
+ outputPosition = 0;
}
- outputPosition = newPosition;
- } else {
- outputPosition += 3;
+ header[i] = outputBuffer + outputPosition;
+ ++outputPosition;
}
}
@@ -200,22 +235,20 @@ namespace orc {
if (bufferSize != 0) {
ensureHeader();
+ uint64_t preSize = getSize();
uint64_t totalCompressedSize = doStreamingCompression();
-
- char * header = outputBuffer + outputPosition - totalCompressedSize - 3;
if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) {
- writeHeader(header, static_cast<size_t>(bufferSize), true);
- memcpy(
- header + 3,
- rawInputBuffer.data(),
- static_cast<size_t>(bufferSize));
-
- int backup = static_cast<int>(totalCompressedSize) - bufferSize;
- BufferedOutputStream::BackUp(backup);
- outputPosition -= backup;
- outputSize -= backup;
+ writeHeader(static_cast<size_t>(bufferSize), true);
+ // reset output buffer
+ outputBuffer = nullptr;
+ outputPosition = outputSize = 0;
+ uint64_t backup = getSize() - preSize;
+ BufferedOutputStream::BackUp(static_cast<int>(backup));
+
+ // copy raw input buffer into block buffer
+ writeData(rawInputBuffer.data(), bufferSize);
} else {
- writeHeader(header, totalCompressedSize, false);
+ writeHeader(totalCompressedSize, false);
}
}
@@ -987,41 +1020,18 @@ DIAGNOSTIC_POP
const unsigned char * dataToWrite = nullptr;
int totalSizeToWrite = 0;
- char * header = outputBuffer + outputPosition - 3;
if (totalCompressedSize >= static_cast<size_t>(bufferSize)) {
- writeHeader(header, static_cast<size_t>(bufferSize), true);
+ writeHeader(static_cast<size_t>(bufferSize), true);
dataToWrite = rawInputBuffer.data();
totalSizeToWrite = bufferSize;
} else {
- writeHeader(header, totalCompressedSize, false);
+ writeHeader(totalCompressedSize, false);
dataToWrite = compressorBuffer.data();
totalSizeToWrite = static_cast<int>(totalCompressedSize);
}
- char * dst = header + 3;
- while (totalSizeToWrite > 0) {
- if (outputPosition == outputSize) {
- if (!BufferedOutputStream::Next(reinterpret_cast<void
**>(&outputBuffer),
- &outputSize)) {
- throw std::logic_error(
- "Failed to get next output buffer from output stream.");
- }
- outputPosition = 0;
- dst = outputBuffer;
- } else if (outputPosition > outputSize) {
- // this will unlikely happen, but we have seen a few on zstd v1.1.0
- throw std::logic_error("Write to an out-of-bound place!");
- }
-
- int sizeToWrite = std::min(totalSizeToWrite, outputSize -
outputPosition);
- std::memcpy(dst, dataToWrite, static_cast<size_t>(sizeToWrite));
-
- outputPosition += sizeToWrite;
- dataToWrite += sizeToWrite;
- totalSizeToWrite -= sizeToWrite;
- dst += sizeToWrite;
- }
+ writeData(dataToWrite, totalSizeToWrite);
}
*data = rawInputBuffer.data();
diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc
index 4485e1b29..686cb1c3f 100644
--- a/c++/src/io/OutputStream.cc
+++ b/c++/src/io/OutputStream.cc
@@ -37,7 +37,7 @@ namespace orc {
: outputStream(outStream),
blockSize(blockSize_),
metrics(metrics_) {
- dataBuffer.reset(new DataBuffer<char>(pool));
+ dataBuffer.reset(new BlockBuffer(pool, blockSize));
dataBuffer->reserve(capacity_);
}
@@ -46,16 +46,12 @@ namespace orc {
}
bool BufferedOutputStream::Next(void** buffer, int* size) {
- *size = static_cast<int>(blockSize);
- uint64_t oldSize = dataBuffer->size();
- uint64_t newSize = oldSize + blockSize;
- uint64_t newCapacity = dataBuffer->capacity();
- while (newCapacity < newSize) {
- newCapacity += dataBuffer->capacity();
+ auto block = dataBuffer->getNextBlock();
+ if (block.data == nullptr) {
+ throw std::logic_error("Failed to get next buffer from block buffer.");
}
- dataBuffer->reserve(newCapacity);
- dataBuffer->resize(newSize);
- *buffer = dataBuffer->data() + oldSize;
+ *buffer = block.data;
+ *size = static_cast<int>(block.size);
return true;
}
@@ -95,9 +91,11 @@ namespace orc {
uint64_t BufferedOutputStream::flush() {
uint64_t dataSize = dataBuffer->size();
+ // flush data buffer into outputStream
+ if (dataSize > 0)
{
- SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
- outputStream->write(dataBuffer->data(), dataSize);
+ SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, nullptr);
+ dataBuffer->writeTo(outputStream, metrics);
}
dataBuffer->resize(0);
return dataSize;
diff --git a/c++/src/io/OutputStream.hh b/c++/src/io/OutputStream.hh
index c49b769d5..0c76be881 100644
--- a/c++/src/io/OutputStream.hh
+++ b/c++/src/io/OutputStream.hh
@@ -20,6 +20,7 @@
#define ORC_OUTPUTSTREAM_HH
#include "Adaptor.hh"
+#include "BlockBuffer.hh"
#include "orc/OrcFile.hh"
#include "wrap/zero-copy-stream-wrapper.h"
@@ -49,7 +50,7 @@ DIAGNOSTIC_PUSH
class BufferedOutputStream: public
google::protobuf::io::ZeroCopyOutputStream {
private:
OutputStream * outputStream;
- std::unique_ptr<DataBuffer<char> > dataBuffer;
+ std::unique_ptr<BlockBuffer> dataBuffer;
uint64_t blockSize;
WriterMetrics* metrics;
diff --git a/c++/test/MemoryOutputStream.hh b/c++/test/MemoryOutputStream.hh
index 6b23a34eb..c05c6239a 100644
--- a/c++/test/MemoryOutputStream.hh
+++ b/c++/test/MemoryOutputStream.hh
@@ -31,6 +31,7 @@ namespace orc {
MemoryOutputStream(ssize_t capacity) : name("MemoryOutputStream") {
data = new char[capacity];
length = 0;
+ naturalWriteSize = 2048;
}
virtual ~MemoryOutputStream() override;
diff --git a/c++/test/TestBlockBuffer.cc b/c++/test/TestBlockBuffer.cc
index c638490e6..2f81cbb1e 100644
--- a/c++/test/TestBlockBuffer.cc
+++ b/c++/test/TestBlockBuffer.cc
@@ -17,10 +17,12 @@
*/
#include "BlockBuffer.hh"
+#include "MemoryOutputStream.hh"
#include "orc/OrcFile.hh"
#include "wrap/gtest-wrapper.h"
namespace orc {
+ const int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; // 10M
TEST(TestBlockBuffer, size_and_capacity) {
MemoryPool* pool = getDefaultPool();
@@ -78,4 +80,42 @@ namespace orc {
}
}
}
+
+ void writeToOutputStream(uint64_t blockSize) {
+ MemoryOutputStream outputStream(DEFAULT_MEM_STREAM_SIZE);
+ MemoryPool* pool = getDefaultPool();
+ BlockBuffer buffer(*pool, blockSize);
+ uint64_t totalBufferSize = 10240;
+ while (buffer.size() < totalBufferSize) {
+ BlockBuffer::Block block = buffer.getNextBlock();
+ uint64_t blockNumber = buffer.getBlockNumber();
+ for (uint64_t j = 0; j < block.size; ++j) {
+ if (blockNumber % 2 == 0) {
+ block.data[j] = static_cast<char>('A' + (blockNumber + j) % 26);
+ } else {
+ block.data[j] = static_cast<char>('a' + (blockNumber + j) % 26);
+ }
+ }
+ }
+ buffer.resize(totalBufferSize);
+ // flush data buffer into output stream
+ buffer.writeTo(&outputStream, nullptr);
+ // verify data buffer
+ uint64_t dataIndex = 0;
+ for (uint64_t i = 0; i < buffer.getBlockNumber(); ++i) {
+ BlockBuffer::Block block = buffer.getBlock(i);
+ for (uint64_t j = 0; j < block.size; ++j) {
+ EXPECT_EQ(outputStream.getData()[dataIndex++], block.data[j]);
+ }
+ }
+ }
+
+ TEST(TestBlockBuffer, write_to) {
+ // test block size < natural write size
+ writeToOutputStream(1024);
+ // test block size = natural write size
+ writeToOutputStream(2048);
+ // test block size > natural write size
+ writeToOutputStream(4096);
+ }
}