This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new 4da1acb3f ORC-1286: [C++] replace DataBuffer with BlockBuffer in class 
BufferedOutputStream
4da1acb3f is described below

commit 4da1acb3fe9529db69687c047a2e3df5e52f6b32
Author: coderex2522 <[email protected]>
AuthorDate: Wed Oct 26 09:00:55 2022 -0700

    ORC-1286: [C++] replace DataBuffer with BlockBuffer in class 
BufferedOutputStream
    
    ### What changes were proposed in this pull request?
    This PR can solve the huge memory taken by BufferedOutputStream and 
refactor the write data logic in class CompressionBase.
    
    ### Why are the changes needed?
    This patch use BlockBuffer to replace DataBuffer  of class 
BufferedOutputStream in order to solve the 
[issue](https://github.com/apache/orc/issues/1240).
    
    ### How was this patch tested?
    The UTs in TestBufferedOutputStream.cc and TestCompression.cc can cover 
this patch. Add the TestBlockBuffer.write_to UT.
    
    Closes #1275 from coderex2522/ORC-1280-PART2.
    
    Authored-by: coderex2522 <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 c++/src/BlockBuffer.cc         |  53 +++++++++++++++++++
 c++/src/BlockBuffer.hh         |   9 ++++
 c++/src/Compression.cc         | 114 ++++++++++++++++++++++-------------------
 c++/src/io/OutputStream.cc     |  22 ++++----
 c++/src/io/OutputStream.hh     |   3 +-
 c++/test/MemoryOutputStream.hh |   1 +
 c++/test/TestBlockBuffer.cc    |  40 +++++++++++++++
 7 files changed, 177 insertions(+), 65 deletions(-)

diff --git a/c++/src/BlockBuffer.cc b/c++/src/BlockBuffer.cc
index aac0d4798..defd86ae9 100644
--- a/c++/src/BlockBuffer.cc
+++ b/c++/src/BlockBuffer.cc
@@ -17,6 +17,8 @@
  */
 
 #include "BlockBuffer.hh"
+#include "orc/Writer.hh"
+#include "orc/OrcFile.hh"
 
 #include <algorithm>
 
@@ -82,4 +84,55 @@ namespace orc {
       }
     }
   }
+
+  void BlockBuffer::writeTo(OutputStream* output,
+                            WriterMetrics* metrics) {
+    if (currentSize == 0) {
+      return;
+    }
+    static uint64_t MAX_CHUNK_SIZE = 1024 * 1024 * 1024;
+    uint64_t chunkSize = std::min(output->getNaturalWriteSize(), 
MAX_CHUNK_SIZE);
+    if (chunkSize == 0) {
+      throw std::logic_error("Natural write size cannot be zero");
+    }
+    uint64_t ioCount = 0;
+    uint64_t blockNumber = getBlockNumber();
+    // if only exists one block, currentSize is equal to first block size
+    if (blockNumber == 1 && currentSize <= chunkSize) {
+      Block block = getBlock(0);
+      output->write(block.data, block.size);
+      ++ioCount;
+    } else {
+      char* chunk = memoryPool.malloc(chunkSize);
+      uint64_t chunkOffset = 0;
+      for (uint64_t i = 0; i < blockNumber; ++i) {
+        Block block = getBlock(i);
+        uint64_t blockOffset = 0;
+        while (blockOffset < block.size) {
+          // copy current block into chunk
+          uint64_t copySize =
+            std::min(chunkSize - chunkOffset, block.size - blockOffset);
+          memcpy(chunk + chunkOffset, block.data + blockOffset, copySize);
+          chunkOffset += copySize;
+          blockOffset += copySize;
+
+          // chunk is full
+          if (chunkOffset >= chunkSize) {
+            output->write(chunk, chunkSize);
+            chunkOffset = 0;
+            ++ioCount;
+          }
+        }
+      }
+      if (chunkOffset != 0) {
+        output->write(chunk, chunkOffset);
+        ++ioCount;
+      }
+      memoryPool.free(chunk);
+    }
+
+    if (metrics != nullptr) {
+      metrics->IOCount.fetch_add(ioCount);
+    }
+  }
 } // namespace orc
diff --git a/c++/src/BlockBuffer.hh b/c++/src/BlockBuffer.hh
index bb22b8a02..2869cce9b 100644
--- a/c++/src/BlockBuffer.hh
+++ b/c++/src/BlockBuffer.hh
@@ -25,6 +25,8 @@
 
 namespace orc {
 
+  class OutputStream;
+  struct WriterMetrics;
   /**
    * BlockBuffer implements a memory allocation policy based on
    * equal-length blocks. BlockBuffer will reserve multiple blocks
@@ -110,6 +112,13 @@ namespace orc {
      * @param newCapacity new capacity of BlockBuffer
      */
     void reserve(uint64_t newCapacity);
+    /**
+     * Write the BlockBuffer content into OutputStream
+     * @param output the output stream to write to
+     * @param metrics the metrics of the writer
+     */
+    void writeTo(OutputStream* output,
+                 WriterMetrics* metrics);
   };
 }  // namespace orc
 
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index aa81deacf..5e256c5cd 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -24,6 +24,7 @@
 #include "lz4.h"
 
 #include <algorithm>
+#include <array>
 #include <iomanip>
 #include <iostream>
 #include <sstream>
@@ -68,10 +69,12 @@ namespace orc {
     virtual uint64_t getSize() const override;
 
   protected:
-    void writeHeader(char * buffer, size_t compressedSize, bool original) {
-      buffer[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 
0));
-      buffer[1] = static_cast<char>(compressedSize >> 7);
-      buffer[2] = static_cast<char>(compressedSize >> 15);
+    void writeData(const unsigned char* data, int size);
+
+    void writeHeader(size_t compressedSize, bool original) {
+      *header[0] = static_cast<char>((compressedSize << 1) + (original ? 1 : 
0));
+      *header[1] = static_cast<char>(compressedSize >> 7);
+      *header[2] = static_cast<char>(compressedSize >> 15);
     }
 
     // ensure enough room for compression block header
@@ -94,6 +97,10 @@ namespace orc {
 
     // Compress output buffer size
     int outputSize;
+
+    // Compression block header pointer array
+    static const uint32_t HEADER_SIZE = 3;
+    std::array<char*, HEADER_SIZE> header;
   };
 
   CompressionStreamBase::CompressionStreamBase(OutputStream * outStream,
@@ -113,7 +120,8 @@ namespace orc {
                                                 bufferSize(0),
                                                 outputPosition(0),
                                                 outputSize(0) {
-    // PASS
+    // init header pointer array
+    header.fill(nullptr);
   }
 
   void CompressionStreamBase::BackUp(int count) {
@@ -145,19 +153,46 @@ namespace orc {
            static_cast<uint64_t>(outputSize - outputPosition);
   }
 
+  // write the data content into outputBuffer
+  void CompressionStreamBase::writeData(const unsigned char* data, int size) {
+    int offset = 0;
+    while (offset < size) {
+      if (outputPosition == outputSize) {
+        if (!BufferedOutputStream::Next(
+          reinterpret_cast<void **>(&outputBuffer),
+          &outputSize)) {
+            throw std::runtime_error(
+                "Failed to get next output buffer from output stream.");
+        }
+        outputPosition = 0;
+      } else  if (outputPosition > outputSize) {
+        // for safety this will unlikely happen
+        throw std::logic_error(
+            "Write to an out-of-bound place during compression!");
+      }
+      int currentSize = std::min(outputSize - outputPosition, size - offset);
+      memcpy(outputBuffer + outputPosition,
+             data + offset,
+             static_cast<size_t>(currentSize));
+      offset += currentSize;
+      outputPosition += currentSize;
+    }
+  }
+
   void CompressionStreamBase::ensureHeader() {
     // adjust 3 bytes for the compression header
-    if (outputPosition + 3 >= outputSize) {
-      int newPosition = outputPosition + 3 - outputSize;
-      if (!BufferedOutputStream::Next(
-        reinterpret_cast<void **>(&outputBuffer),
-        &outputSize)) {
+    for (uint32_t i = 0; i < HEADER_SIZE; ++i) {
+      if (outputPosition >= outputSize) {
+        if (!BufferedOutputStream::Next(
+          reinterpret_cast<void **>(&outputBuffer),
+          &outputSize)) {
         throw std::runtime_error(
           "Failed to get next output buffer from output stream.");
+        }
+        outputPosition = 0;
       }
-      outputPosition = newPosition;
-    } else {
-      outputPosition += 3;
+      header[i] = outputBuffer + outputPosition;
+      ++outputPosition;
     }
   }
 
@@ -200,22 +235,20 @@ namespace orc {
     if (bufferSize != 0) {
       ensureHeader();
 
+      uint64_t preSize = getSize();
       uint64_t totalCompressedSize = doStreamingCompression();
-
-      char * header = outputBuffer + outputPosition - totalCompressedSize - 3;
       if (totalCompressedSize >= static_cast<unsigned long>(bufferSize)) {
-        writeHeader(header, static_cast<size_t>(bufferSize), true);
-        memcpy(
-          header + 3,
-          rawInputBuffer.data(),
-          static_cast<size_t>(bufferSize));
-
-        int backup = static_cast<int>(totalCompressedSize) - bufferSize;
-        BufferedOutputStream::BackUp(backup);
-        outputPosition -= backup;
-        outputSize -= backup;
+        writeHeader(static_cast<size_t>(bufferSize), true);
+        // reset output buffer
+        outputBuffer = nullptr;
+        outputPosition = outputSize = 0;
+        uint64_t backup = getSize() - preSize;
+        BufferedOutputStream::BackUp(static_cast<int>(backup));
+
+        // copy raw input buffer into block buffer
+        writeData(rawInputBuffer.data(), bufferSize);
       } else {
-        writeHeader(header, totalCompressedSize, false);
+        writeHeader(totalCompressedSize, false);
       }
     }
 
@@ -987,41 +1020,18 @@ DIAGNOSTIC_POP
 
       const unsigned char * dataToWrite = nullptr;
       int totalSizeToWrite = 0;
-      char * header = outputBuffer + outputPosition - 3;
 
       if (totalCompressedSize >= static_cast<size_t>(bufferSize)) {
-        writeHeader(header, static_cast<size_t>(bufferSize), true);
+        writeHeader(static_cast<size_t>(bufferSize), true);
         dataToWrite = rawInputBuffer.data();
         totalSizeToWrite = bufferSize;
       } else {
-        writeHeader(header, totalCompressedSize, false);
+        writeHeader(totalCompressedSize, false);
         dataToWrite = compressorBuffer.data();
         totalSizeToWrite = static_cast<int>(totalCompressedSize);
       }
 
-      char * dst = header + 3;
-      while (totalSizeToWrite > 0) {
-        if (outputPosition == outputSize) {
-          if (!BufferedOutputStream::Next(reinterpret_cast<void 
**>(&outputBuffer),
-                                          &outputSize)) {
-            throw std::logic_error(
-              "Failed to get next output buffer from output stream.");
-          }
-          outputPosition = 0;
-          dst = outputBuffer;
-        } else if (outputPosition > outputSize) {
-          // this will unlikely happen, but we have seen a few on zstd v1.1.0
-          throw std::logic_error("Write to an out-of-bound place!");
-        }
-
-        int sizeToWrite = std::min(totalSizeToWrite, outputSize - 
outputPosition);
-        std::memcpy(dst, dataToWrite, static_cast<size_t>(sizeToWrite));
-
-        outputPosition += sizeToWrite;
-        dataToWrite += sizeToWrite;
-        totalSizeToWrite -= sizeToWrite;
-        dst += sizeToWrite;
-      }
+      writeData(dataToWrite, totalSizeToWrite);
     }
 
     *data = rawInputBuffer.data();
diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc
index 4485e1b29..686cb1c3f 100644
--- a/c++/src/io/OutputStream.cc
+++ b/c++/src/io/OutputStream.cc
@@ -37,7 +37,7 @@ namespace orc {
                                     : outputStream(outStream),
                                       blockSize(blockSize_),
                                       metrics(metrics_) {
-    dataBuffer.reset(new DataBuffer<char>(pool));
+    dataBuffer.reset(new BlockBuffer(pool, blockSize));
     dataBuffer->reserve(capacity_);
   }
 
@@ -46,16 +46,12 @@ namespace orc {
   }
 
   bool BufferedOutputStream::Next(void** buffer, int* size) {
-    *size = static_cast<int>(blockSize);
-    uint64_t oldSize = dataBuffer->size();
-    uint64_t newSize = oldSize + blockSize;
-    uint64_t newCapacity = dataBuffer->capacity();
-    while (newCapacity < newSize) {
-      newCapacity += dataBuffer->capacity();
+    auto block = dataBuffer->getNextBlock();
+    if (block.data == nullptr) {
+      throw std::logic_error("Failed to get next buffer from block buffer.");
     }
-    dataBuffer->reserve(newCapacity);
-    dataBuffer->resize(newSize);
-    *buffer = dataBuffer->data() + oldSize;
+    *buffer = block.data;
+    *size = static_cast<int>(block.size);
     return true;
   }
 
@@ -95,9 +91,11 @@ namespace orc {
 
   uint64_t BufferedOutputStream::flush() {
     uint64_t dataSize = dataBuffer->size();
+    // flush data buffer into outputStream
+    if (dataSize > 0)
     {
-      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, IOCount);
-      outputStream->write(dataBuffer->data(), dataSize);
+      SCOPED_STOPWATCH(metrics, IOBlockingLatencyUs, nullptr);
+      dataBuffer->writeTo(outputStream, metrics);
     }
     dataBuffer->resize(0);
     return dataSize;
diff --git a/c++/src/io/OutputStream.hh b/c++/src/io/OutputStream.hh
index c49b769d5..0c76be881 100644
--- a/c++/src/io/OutputStream.hh
+++ b/c++/src/io/OutputStream.hh
@@ -20,6 +20,7 @@
 #define ORC_OUTPUTSTREAM_HH
 
 #include "Adaptor.hh"
+#include "BlockBuffer.hh"
 #include "orc/OrcFile.hh"
 #include "wrap/zero-copy-stream-wrapper.h"
 
@@ -49,7 +50,7 @@ DIAGNOSTIC_PUSH
   class BufferedOutputStream: public 
google::protobuf::io::ZeroCopyOutputStream {
   private:
     OutputStream * outputStream;
-    std::unique_ptr<DataBuffer<char> > dataBuffer;
+    std::unique_ptr<BlockBuffer> dataBuffer;
     uint64_t blockSize;
     WriterMetrics* metrics;
 
diff --git a/c++/test/MemoryOutputStream.hh b/c++/test/MemoryOutputStream.hh
index 6b23a34eb..c05c6239a 100644
--- a/c++/test/MemoryOutputStream.hh
+++ b/c++/test/MemoryOutputStream.hh
@@ -31,6 +31,7 @@ namespace orc {
     MemoryOutputStream(ssize_t capacity) : name("MemoryOutputStream") {
       data = new char[capacity];
       length = 0;
+      naturalWriteSize = 2048;
     }
 
     virtual ~MemoryOutputStream() override;
diff --git a/c++/test/TestBlockBuffer.cc b/c++/test/TestBlockBuffer.cc
index c638490e6..2f81cbb1e 100644
--- a/c++/test/TestBlockBuffer.cc
+++ b/c++/test/TestBlockBuffer.cc
@@ -17,10 +17,12 @@
  */
 
 #include "BlockBuffer.hh"
+#include "MemoryOutputStream.hh"
 #include "orc/OrcFile.hh"
 #include "wrap/gtest-wrapper.h"
 
 namespace orc {
+  const int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024; // 10M
 
   TEST(TestBlockBuffer, size_and_capacity) {
     MemoryPool* pool = getDefaultPool();
@@ -78,4 +80,42 @@ namespace orc {
       }
     }
   }
+
+  void writeToOutputStream(uint64_t blockSize) {
+    MemoryOutputStream outputStream(DEFAULT_MEM_STREAM_SIZE);
+    MemoryPool* pool = getDefaultPool();
+    BlockBuffer buffer(*pool, blockSize);
+    uint64_t totalBufferSize = 10240;
+    while (buffer.size() < totalBufferSize) {
+      BlockBuffer::Block block = buffer.getNextBlock();
+      uint64_t blockNumber = buffer.getBlockNumber();
+      for (uint64_t j = 0; j < block.size; ++j) {
+        if (blockNumber % 2 == 0) {
+          block.data[j] = static_cast<char>('A' + (blockNumber + j) % 26);
+        } else {
+          block.data[j] = static_cast<char>('a' + (blockNumber + j) % 26);
+        }
+      }
+    }
+    buffer.resize(totalBufferSize);
+    // flush data buffer into output stream
+    buffer.writeTo(&outputStream, nullptr);
+    // verify data buffer
+    uint64_t dataIndex = 0;
+    for (uint64_t i = 0; i < buffer.getBlockNumber(); ++i) {
+      BlockBuffer::Block block = buffer.getBlock(i);
+      for (uint64_t j = 0; j < block.size; ++j) {
+        EXPECT_EQ(outputStream.getData()[dataIndex++], block.data[j]);
+      }
+    }
+  }
+
+  TEST(TestBlockBuffer, write_to) {
+    // test block size < natural write size
+    writeToOutputStream(1024);
+    // test block size = natural write size
+    writeToOutputStream(2048);
+    // test block size > natural write size
+    writeToOutputStream(4096);
+  }
 }

Reply via email to