This is an automated email from the ASF dual-hosted git repository.

ffacs pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new e8402b233 ORC-1711: [C++] Introduce a memory block size parameter for 
writer option
e8402b233 is described below

commit e8402b233fd671f8fe42ec7eff24ee4fea51256b
Author: luffy-zh <[email protected]>
AuthorDate: Wed May 22 19:51:30 2024 +0800

    ORC-1711: [C++] Introduce a memory block size parameter for writer option
    
    ## What changes were proposed in this pull request?
    1.  Add the memory block size parameter to the writer option, which 
initializing the compressed input buffer block size
    2. The compressed stream will retain the input buffer until the input 
buffer size reaches the compression block size, allowing the compressed stream 
to start with a minimal initial memory footprint.
    
    ## Why are the changes needed?
    This code segment distinguishes between the compression block size and the 
input buffer size to solve the 
[issue](https://issues.apache.org/jira/browse/ORC-1711).
    
    ## How was this patch tested?
    The UTs in TestCompression.cc and TestWriter.cc can cover this patch.
    
    Closes #1932 from luffy-zh/ORC-1711.
    
    Authored-by: luffy-zh <[email protected]>
    Signed-off-by: ffacs <[email protected]>
---
 c++/include/orc/Writer.hh          |  13 +++
 c++/src/ByteRLE.cc                 |  11 +--
 c++/src/ColumnWriter.cc            |  10 +--
 c++/src/Compression.cc             |  89 +++++++++++++-------
 c++/src/Compression.hh             |  15 ++--
 c++/src/RLE.cc                     |  10 ++-
 c++/src/Writer.cc                  |  24 +++++-
 c++/src/io/OutputStream.cc         |  15 ++--
 c++/src/io/OutputStream.hh         |   1 +
 c++/test/TestByteRle.cc            |   2 +-
 c++/test/TestCompression.cc        |  26 +++---
 c++/test/TestDictionaryEncoding.cc |   7 +-
 c++/test/TestPredicatePushdown.cc  |   2 +
 c++/test/TestReader.cc             |   4 +
 c++/test/TestWriter.cc             | 163 ++++++++++++++++++++++---------------
 15 files changed, 253 insertions(+), 139 deletions(-)

diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
index 7968fbce7..b560627c4 100644
--- a/c++/include/orc/Writer.hh
+++ b/c++/include/orc/Writer.hh
@@ -277,6 +277,19 @@ namespace orc {
      * @return if not set, return default value which is 1 MB.
      */
     uint64_t getOutputBufferCapacity() const;
+
+    /**
+     * Set the initial block size of original input buffer in the class 
CompressionStream.
+     * the input buffer is used to store raw data before compression, while 
the output buffer is
+     * dedicated to holding compressed data
+     */
+    WriterOptions& setMemoryBlockSize(uint64_t capacity);
+
+    /**
+     * Get the initial block size of original input buffer in the class 
CompressionStream.
+     * @return if not set, return default value which is 64 KB.
+     */
+    uint64_t getMemoryBlockSize() const;
   };
 
   class Writer {
diff --git a/c++/src/ByteRLE.cc b/c++/src/ByteRLE.cc
index bdbaad1da..02a3e4041 100644
--- a/c++/src/ByteRLE.cc
+++ b/c++/src/ByteRLE.cc
@@ -186,16 +186,17 @@ namespace orc {
 
   void ByteRleEncoderImpl::recordPosition(PositionRecorder* recorder) const {
     uint64_t flushedSize = outputStream->getSize();
-    uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition);
+    uint64_t unusedBufferSize = static_cast<uint64_t>(bufferLength - 
bufferPosition);
     if (outputStream->isCompressed()) {
       // start of the compression chunk in the stream
       recorder->add(flushedSize);
-      // number of decompressed bytes that need to be consumed
-      recorder->add(unflushedSize);
+      // There are multiple blocks in the input buffer, but bufferPosition 
only records the
+      // effective length of the last block. We need rawInputBufferSize to 
record the total length
+      // of all variable blocks.
+      recorder->add(outputStream->getRawInputBufferSize() - unusedBufferSize);
     } else {
-      flushedSize -= static_cast<uint64_t>(bufferLength);
       // byte offset of the RLE run’s start location
-      recorder->add(flushedSize + unflushedSize);
+      recorder->add(flushedSize - unusedBufferSize);
     }
     recorder->add(static_cast<uint64_t>(numLiterals));
   }
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
index 05ffd3a2d..109eef889 100644
--- a/c++/src/ColumnWriter.cc
+++ b/c++/src/ColumnWriter.cc
@@ -48,11 +48,11 @@ namespace orc {
     // In the future, we can decide compression strategy and modifier
     // based on stream kind. But for now we just use the setting from
     // WriterOption
-    return createCompressor(options_.getCompression(), outStream_,
-                            options_.getCompressionStrategy(),
-                            // BufferedOutputStream initial capacity
-                            options_.getOutputBufferCapacity(), 
options_.getCompressionBlockSize(),
-                            *options_.getMemoryPool(), 
options_.getWriterMetrics());
+    return createCompressor(
+        options_.getCompression(), outStream_, 
options_.getCompressionStrategy(),
+        // BufferedOutputStream initial capacity
+        options_.getOutputBufferCapacity(), options_.getCompressionBlockSize(),
+        options_.getMemoryBlockSize(), *options_.getMemoryPool(), 
options_.getWriterMetrics());
   }
 
   std::unique_ptr<StreamsFactory> createStreamsFactory(const WriterOptions& 
options,
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index a1595da49..3fba1d2ee 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -52,7 +52,8 @@ namespace orc {
   class CompressionStreamBase : public BufferedOutputStream {
    public:
     CompressionStreamBase(OutputStream* outStream, int compressionLevel, 
uint64_t capacity,
-                          uint64_t blockSize, MemoryPool& pool, WriterMetrics* 
metrics);
+                          uint64_t compressionBlockSize, uint64_t 
memoryBlockSize, MemoryPool& pool,
+                          WriterMetrics* metrics);
 
     virtual bool Next(void** data, int* size) override = 0;
     virtual void BackUp(int count) override = 0;
@@ -65,6 +66,7 @@ namespace orc {
       return true;
     }
     virtual uint64_t getSize() const override;
+    virtual uint64_t getRawInputBufferSize() const override = 0;
 
    protected:
     void writeData(const unsigned char* data, int size);
@@ -96,17 +98,22 @@ namespace orc {
     // Compression block header pointer array
     static const uint32_t HEADER_SIZE = 3;
     std::array<char*, HEADER_SIZE> header;
+
+    // Compression block size
+    uint64_t compressionBlockSize;
   };
 
   CompressionStreamBase::CompressionStreamBase(OutputStream* outStream, int 
compressionLevel,
-                                               uint64_t capacity, uint64_t 
blockSize,
-                                               MemoryPool& pool, 
WriterMetrics* metrics)
-      : BufferedOutputStream(pool, outStream, capacity, blockSize, metrics),
+                                               uint64_t capacity, uint64_t 
compressionBlockSize,
+                                               uint64_t memoryBlockSize, 
MemoryPool& pool,
+                                               WriterMetrics* metrics)
+      : BufferedOutputStream(pool, outStream, capacity, memoryBlockSize, 
metrics),
         level(compressionLevel),
         outputBuffer(nullptr),
         bufferSize(0),
         outputPosition(0),
-        outputSize(0) {
+        outputSize(0),
+        compressionBlockSize(compressionBlockSize) {
     // init header pointer array
     header.fill(nullptr);
   }
@@ -155,13 +162,17 @@ namespace orc {
   class CompressionStream : public CompressionStreamBase {
    public:
     CompressionStream(OutputStream* outStream, int compressionLevel, uint64_t 
capacity,
-                      uint64_t blockSize, MemoryPool& pool, WriterMetrics* 
metrics);
+                      uint64_t compressionBlockSize, uint64_t memoryBlockSize, 
MemoryPool& pool,
+                      WriterMetrics* metrics);
 
     virtual bool Next(void** data, int* size) override;
     virtual std::string getName() const override = 0;
     virtual void BackUp(int count) override;
     virtual void suppress() override;
     virtual uint64_t flush() override;
+    uint64_t getRawInputBufferSize() const override {
+      return rawInputBuffer.size();
+    }
 
    protected:
     // return total compressed size
@@ -169,6 +180,8 @@ namespace orc {
 
     // Buffer to hold uncompressed data until user calls Next()
     BlockBuffer rawInputBuffer;
+
+    void compressInternal();
   };
 
   void CompressionStream::BackUp(int count) {
@@ -181,11 +194,7 @@ namespace orc {
   }
 
   uint64_t CompressionStream::flush() {
-    void* data;
-    int size;
-    if (!Next(&data, &size)) {
-      throw std::runtime_error("Failed to flush compression buffer.");
-    }
+    compressInternal();
     BufferedOutputStream::BackUp(outputSize - outputPosition);
     rawInputBuffer.resize(0);
     outputSize = outputPosition = 0;
@@ -200,14 +209,16 @@ namespace orc {
   }
 
   CompressionStream::CompressionStream(OutputStream* outStream, int 
compressionLevel,
-                                       uint64_t capacity, uint64_t blockSize, 
MemoryPool& pool,
+                                       uint64_t capacity, uint64_t 
compressionBlockSize,
+                                       uint64_t memoryBlockSize, MemoryPool& 
pool,
                                        WriterMetrics* metrics)
-      : CompressionStreamBase(outStream, compressionLevel, capacity, 
blockSize, pool, metrics),
-        rawInputBuffer(pool, blockSize) {
+      : CompressionStreamBase(outStream, compressionLevel, capacity, 
compressionBlockSize,
+                              memoryBlockSize, pool, metrics),
+        rawInputBuffer(pool, memoryBlockSize) {
     // PASS
   }
 
-  bool CompressionStream::Next(void** data, int* size) {
+  void CompressionStream::compressInternal() {
     if (rawInputBuffer.size() != 0) {
       ensureHeader();
 
@@ -232,6 +243,20 @@ namespace orc {
       }
       rawInputBuffer.resize(0);
     }
+  }
+
+  bool CompressionStream::Next(void** data, int* size) {
+    if (rawInputBuffer.size() > compressionBlockSize) {
+      std::stringstream ss;
+      ss << "uncompressed data size " << rawInputBuffer.size()
+         << " is larger than compression block size " << compressionBlockSize;
+      throw std::logic_error(ss.str());
+    }
+
+    // compress data in the rawInputBuffer when it is full
+    if (rawInputBuffer.size() == compressionBlockSize) {
+      compressInternal();
+    }
 
     auto block = rawInputBuffer.getNextBlock();
     *data = block.data;
@@ -241,8 +266,9 @@ namespace orc {
 
   class ZlibCompressionStream : public CompressionStream {
    public:
-    ZlibCompressionStream(OutputStream* outStream, int compressionLevel, 
uint64_t capacity,
-                          uint64_t blockSize, MemoryPool& pool, WriterMetrics* 
metrics);
+    ZlibCompressionStream(OutputStream* outStream, int compressionLevel, 
uint64_t bufferCapacity,
+                          uint64_t compressionBlockSize, uint64_t 
memoryBlockSize, MemoryPool& pool,
+                          WriterMetrics* metrics);
 
     virtual ~ZlibCompressionStream() override {
       end();
@@ -260,9 +286,12 @@ namespace orc {
   };
 
   ZlibCompressionStream::ZlibCompressionStream(OutputStream* outStream, int 
compressionLevel,
-                                               uint64_t capacity, uint64_t 
blockSize,
-                                               MemoryPool& pool, 
WriterMetrics* metrics)
-      : CompressionStream(outStream, compressionLevel, capacity, blockSize, 
pool, metrics) {
+                                               uint64_t bufferCapacity,
+                                               uint64_t compressionBlockSize,
+                                               uint64_t memoryBlockSize, 
MemoryPool& pool,
+                                               WriterMetrics* metrics)
+      : CompressionStream(outStream, compressionLevel, bufferCapacity, 
compressionBlockSize,
+                          memoryBlockSize, pool, metrics) {
     init();
   }
 
@@ -904,7 +933,8 @@ namespace orc {
    public:
     BlockCompressionStream(OutputStream* outStream, int compressionLevel, 
uint64_t capacity,
                            uint64_t blockSize, MemoryPool& pool, 
WriterMetrics* metrics)
-        : CompressionStreamBase(outStream, compressionLevel, capacity, 
blockSize, pool, metrics),
+        : CompressionStreamBase(outStream, compressionLevel, capacity, 
blockSize, blockSize, pool,
+                                metrics),
           compressorBuffer(pool),
           rawInputBuffer(pool, blockSize) {
       // PASS
@@ -915,6 +945,9 @@ namespace orc {
     virtual void BackUp(int count) override;
     virtual uint64_t flush() override;
     virtual std::string getName() const override = 0;
+    uint64_t getRawInputBufferSize() const override {
+      return rawInputBuffer.size();
+    }
 
    protected:
     // compresses a block and returns the compressed size
@@ -1189,12 +1222,10 @@ namespace orc {
 
   DIAGNOSTIC_PUSH
 
-  std::unique_ptr<BufferedOutputStream> createCompressor(CompressionKind kind,
-                                                         OutputStream* 
outStream,
-                                                         CompressionStrategy 
strategy,
-                                                         uint64_t 
bufferCapacity,
-                                                         uint64_t 
compressionBlockSize,
-                                                         MemoryPool& pool, 
WriterMetrics* metrics) {
+  std::unique_ptr<BufferedOutputStream> createCompressor(
+      CompressionKind kind, OutputStream* outStream, CompressionStrategy 
strategy,
+      uint64_t bufferCapacity, uint64_t compressionBlockSize, uint64_t 
memoryBlockSize,
+      MemoryPool& pool, WriterMetrics* metrics) {
     switch (static_cast<int64_t>(kind)) {
       case CompressionKind_NONE: {
         return std::make_unique<BufferedOutputStream>(pool, outStream, 
bufferCapacity,
@@ -1203,8 +1234,8 @@ namespace orc {
       case CompressionKind_ZLIB: {
         int level =
             (strategy == CompressionStrategy_SPEED) ? Z_BEST_SPEED + 1 : 
Z_DEFAULT_COMPRESSION;
-        return std::make_unique<ZlibCompressionStream>(outStream, level, 
bufferCapacity,
-                                                       compressionBlockSize, 
pool, metrics);
+        return std::make_unique<ZlibCompressionStream>(
+            outStream, level, bufferCapacity, compressionBlockSize, 
memoryBlockSize, pool, metrics);
       }
       case CompressionKind_ZSTD: {
         int level = (strategy == CompressionStrategy_SPEED) ? 1 : 
ZSTD_CLEVEL_DEFAULT;
diff --git a/c++/src/Compression.hh b/c++/src/Compression.hh
index 55b152dd6..24170c56b 100644
--- a/c++/src/Compression.hh
+++ b/c++/src/Compression.hh
@@ -42,15 +42,16 @@ namespace orc {
    * @param outStream the output stream that is the underlying target
    * @param strategy compression strategy
    * @param bufferCapacity compression stream buffer total capacity
-   * @param compressionBlockSize compression buffer block size
+   * @param compressionBlockSize compression is triggered when the original 
input buffer size
+   * reaches this size
+   * @param memoryBlockSize the block size for original input buffer
    * @param pool the memory pool
+   * @param metrics the writer metrics
    */
-  std::unique_ptr<BufferedOutputStream> createCompressor(CompressionKind kind,
-                                                         OutputStream* 
outStream,
-                                                         CompressionStrategy 
strategy,
-                                                         uint64_t 
bufferCapacity,
-                                                         uint64_t 
compressionBlockSize,
-                                                         MemoryPool& pool, 
WriterMetrics* metrics);
+  std::unique_ptr<BufferedOutputStream> createCompressor(
+      CompressionKind kind, OutputStream* outStream, CompressionStrategy 
strategy,
+      uint64_t bufferCapacity, uint64_t compressionBlockSize, uint64_t 
memoryBlockSize,
+      MemoryPool& pool, WriterMetrics* metrics);
 }  // namespace orc
 
 #endif
diff --git a/c++/src/RLE.cc b/c++/src/RLE.cc
index 89aca6a10..23168ff7f 100644
--- a/c++/src/RLE.cc
+++ b/c++/src/RLE.cc
@@ -108,13 +108,15 @@ namespace orc {
 
   void RleEncoder::recordPosition(PositionRecorder* recorder) const {
     uint64_t flushedSize = outputStream->getSize();
-    uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition);
+    uint64_t unusedBufferSize = static_cast<uint64_t>(bufferLength - 
bufferPosition);
     if (outputStream->isCompressed()) {
       recorder->add(flushedSize);
-      recorder->add(unflushedSize);
+      // There are multiple blocks in the input buffer, but bufferPosition 
only records the
+      // effective length of the last block. We need rawInputBufferSize to 
record the total length
+      // of all variable blocks.
+      recorder->add(outputStream->getRawInputBufferSize() - unusedBufferSize);
     } else {
-      flushedSize -= static_cast<uint64_t>(bufferLength);
-      recorder->add(flushedSize + unflushedSize);
+      recorder->add(flushedSize - unusedBufferSize);
     }
     recorder->add(static_cast<uint64_t>(numLiterals));
   }
diff --git a/c++/src/Writer.cc b/c++/src/Writer.cc
index 0d672549c..531b56655 100644
--- a/c++/src/Writer.cc
+++ b/c++/src/Writer.cc
@@ -46,6 +46,7 @@ namespace orc {
     WriterMetrics* metrics;
     bool useTightNumericVector;
     uint64_t outputBufferCapacity;
+    uint64_t memoryBlockSize;
 
     WriterOptionsPrivate() : fileVersion(FileVersion::v_0_12()) {  // default 
to Hive_0_12
       stripeSize = 64 * 1024 * 1024;                               // 64M
@@ -67,6 +68,7 @@ namespace orc {
       metrics = nullptr;
       useTightNumericVector = false;
       outputBufferCapacity = 1024 * 1024;
+      memoryBlockSize = 64 * 1024;  // 64K
     }
   };
 
@@ -287,6 +289,15 @@ namespace orc {
     return privateBits_->outputBufferCapacity;
   }
 
+  WriterOptions& WriterOptions::setMemoryBlockSize(uint64_t capacity) {
+    privateBits_->memoryBlockSize = capacity;
+    return *this;
+  }
+
+  uint64_t WriterOptions::getMemoryBlockSize() const {
+    return privateBits_->memoryBlockSize;
+  }
+
   Writer::~Writer() {
     // PASS
   }
@@ -352,11 +363,16 @@ namespace orc {
 
     useTightNumericVector_ = opts.getUseTightNumericVector();
 
+    if (options_.getCompressionBlockSize() % options_.getMemoryBlockSize() != 
0) {
+      throw std::invalid_argument(
+          "Compression block size must be a multiple of memory block size.");
+    }
+
     // compression stream for stripe footer, file footer and metadata
-    compressionStream_ =
-        createCompressor(options_.getCompression(), outStream_, 
options_.getCompressionStrategy(),
-                         options_.getOutputBufferCapacity(), 
options_.getCompressionBlockSize(),
-                         *options_.getMemoryPool(), 
options_.getWriterMetrics());
+    compressionStream_ = createCompressor(
+        options_.getCompression(), outStream_, 
options_.getCompressionStrategy(),
+        options_.getOutputBufferCapacity(), options_.getCompressionBlockSize(),
+        options_.getMemoryBlockSize(), *options_.getMemoryPool(), 
options_.getWriterMetrics());
 
     // uncompressed stream for post script
     bufferedStream_.reset(new BufferedOutputStream(*options_.getMemoryPool(), 
outStream_,
diff --git a/c++/src/io/OutputStream.cc b/c++/src/io/OutputStream.cc
index 5f9563f30..26b5f7e5d 100644
--- a/c++/src/io/OutputStream.cc
+++ b/c++/src/io/OutputStream.cc
@@ -98,6 +98,10 @@ namespace orc {
     dataBuffer_->resize(0);
   }
 
+  uint64_t BufferedOutputStream::getRawInputBufferSize() const {
+    throw std::logic_error("getRawInputBufferSize is not supported.");
+  }
+
   void AppendOnlyBufferedStream::write(const char* data, size_t size) {
     size_t dataOffset = 0;
     while (size > 0) {
@@ -128,16 +132,17 @@ namespace orc {
 
   void AppendOnlyBufferedStream::recordPosition(PositionRecorder* recorder) 
const {
     uint64_t flushedSize = outStream_->getSize();
-    uint64_t unflushedSize = static_cast<uint64_t>(bufferOffset_);
+    uint64_t unusedBufferSize = static_cast<uint64_t>(bufferLength_ - 
bufferOffset_);
     if (outStream_->isCompressed()) {
       // start of the compression chunk in the stream
       recorder->add(flushedSize);
-      // number of decompressed bytes that need to be consumed
-      recorder->add(unflushedSize);
+      // There are multiple blocks in the input buffer, but bufferPosition 
only records the
+      // effective length of the last block. We need rawInputBufferSize to 
record the total length
+      // of all variable blocks.
+      recorder->add(outStream_->getRawInputBufferSize() - unusedBufferSize);
     } else {
-      flushedSize -= static_cast<uint64_t>(bufferLength_);
       // byte offset of the start location
-      recorder->add(flushedSize + unflushedSize);
+      recorder->add(flushedSize - unusedBufferSize);
     }
   }
 
diff --git a/c++/src/io/OutputStream.hh b/c++/src/io/OutputStream.hh
index c63bc805b..a869632e6 100644
--- a/c++/src/io/OutputStream.hh
+++ b/c++/src/io/OutputStream.hh
@@ -69,6 +69,7 @@ namespace orc {
     virtual uint64_t getSize() const;
     virtual uint64_t flush();
     virtual void suppress();
+    virtual uint64_t getRawInputBufferSize() const;
 
     virtual bool isCompressed() const {
       return false;
diff --git a/c++/test/TestByteRle.cc b/c++/test/TestByteRle.cc
index a822a61d6..7717eab38 100644
--- a/c++/test/TestByteRle.cc
+++ b/c++/test/TestByteRle.cc
@@ -1263,7 +1263,7 @@ namespace orc {
     MemoryOutputStream memStream(capacity);
     std::unique_ptr<ByteRleEncoder> encoder = createBooleanRleEncoder(
         createCompressor(CompressionKind_ZSTD, &memStream, 
CompressionStrategy_COMPRESSION,
-                         capacity, blockSize, *getDefaultPool(), nullptr));
+                         capacity, blockSize, blockSize, *getDefaultPool(), 
nullptr));
     encoder->add(data, numValues, nullptr);
     encoder->flush();
 
diff --git a/c++/test/TestCompression.cc b/c++/test/TestCompression.cc
index a77800a3d..e95a6f016 100644
--- a/c++/test/TestCompression.cc
+++ b/c++/test/TestCompression.cc
@@ -42,12 +42,12 @@ namespace orc {
   }
 
   void decompressAndVerify(const MemoryOutputStream& memStream, 
CompressionKind kind,
-                           const char* data, size_t size, MemoryPool& pool) {
+                           const char* data, size_t size, MemoryPool& pool, 
uint64_t capacity) {
     auto inputStream =
         std::make_unique<SeekableArrayInputStream>(memStream.getData(), 
memStream.getLength());
 
     std::unique_ptr<SeekableInputStream> decompressStream =
-        createDecompressor(kind, std::move(inputStream), 1024, pool, 
getDefaultReaderMetrics());
+        createDecompressor(kind, std::move(inputStream), capacity, pool, 
getDefaultReaderMetrics());
 
     const char* decompressedBuffer;
     int decompressedSize;
@@ -66,7 +66,7 @@ namespace orc {
                          CompressionStrategy strategy, uint64_t capacity, 
uint64_t block,
                          MemoryPool& pool, const char* data, size_t dataSize) {
     std::unique_ptr<BufferedOutputStream> compressStream =
-        createCompressor(kind, outStream, strategy, capacity, block, pool, 
nullptr);
+        createCompressor(kind, outStream, strategy, capacity, block, block, 
pool, nullptr);
 
     size_t pos = 0;
     char* compressBuffer;
@@ -99,7 +99,7 @@ namespace orc {
     char testData[] = "hello world!";
     compressAndVerify(kind, &memStream, CompressionStrategy_SPEED, capacity, 
block, *pool, testData,
                       sizeof(testData));
-    decompressAndVerify(memStream, kind, testData, sizeof(testData), *pool);
+    decompressAndVerify(memStream, kind, testData, sizeof(testData), *pool, 
capacity);
   }
 
   TEST(TestCompression, zlib_compress_original_string) {
@@ -117,7 +117,7 @@ namespace orc {
     char testData[] = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa";
     compressAndVerify(kind, &memStream, CompressionStrategy_SPEED, capacity, 
block, *pool, testData,
                       sizeof(testData));
-    decompressAndVerify(memStream, kind, testData, sizeof(testData), *pool);
+    decompressAndVerify(memStream, kind, testData, sizeof(testData), *pool, 
capacity);
   }
 
   TEST(TestCompression, compress_simple_repeated_string) {
@@ -138,7 +138,7 @@ namespace orc {
     }
     compressAndVerify(kind, &memStream, CompressionStrategy_SPEED, capacity, 
block, *pool, testData,
                       170);
-    decompressAndVerify(memStream, kind, testData, 170, *pool);
+    decompressAndVerify(memStream, kind, testData, 170, *pool, capacity);
   }
 
   TEST(TestCompression, zlib_compress_two_blocks) {
@@ -158,7 +158,7 @@ namespace orc {
     generateRandomData(testData, dataSize, true);
     compressAndVerify(kind, &memStream, CompressionStrategy_SPEED, capacity, 
block, *pool, testData,
                       dataSize);
-    decompressAndVerify(memStream, kind, testData, dataSize, *pool);
+    decompressAndVerify(memStream, kind, testData, dataSize, *pool, capacity);
     delete[] testData;
   }
 
@@ -179,7 +179,7 @@ namespace orc {
     generateRandomData(testData, dataSize, false);
     compressAndVerify(kind, &memStream, CompressionStrategy_SPEED, capacity, 
block, *pool, testData,
                       dataSize);
-    decompressAndVerify(memStream, kind, testData, dataSize, *pool);
+    decompressAndVerify(memStream, kind, testData, dataSize, *pool, capacity);
     delete[] testData;
   }
 
@@ -205,7 +205,7 @@ namespace orc {
     }
 
     std::unique_ptr<BufferedOutputStream> compressStream = createCompressor(
-        kind, &memStream, CompressionStrategy_SPEED, capacity, block, *pool, 
nullptr);
+        kind, &memStream, CompressionStrategy_SPEED, capacity, block, block, 
*pool, nullptr);
 
     EXPECT_TRUE(ps.SerializeToZeroCopyStream(compressStream.get()));
     compressStream->flush();
@@ -213,8 +213,8 @@ namespace orc {
     auto inputStream =
         std::make_unique<SeekableArrayInputStream>(memStream.getData(), 
memStream.getLength());
 
-    std::unique_ptr<SeekableInputStream> decompressStream =
-        createDecompressor(kind, std::move(inputStream), 1024, *pool, 
getDefaultReaderMetrics());
+    std::unique_ptr<SeekableInputStream> decompressStream = createDecompressor(
+        kind, std::move(inputStream), capacity, *pool, 
getDefaultReaderMetrics());
 
     proto::PostScript ps2;
     ps2.ParseFromZeroCopyStream(decompressStream.get());
@@ -312,7 +312,7 @@ namespace orc {
     uint64_t batchSize = 1024, blockSize = 256;
 
     AppendOnlyBufferedStream outStream(createCompressor(
-        kind, &memStream, strategy, DEFAULT_MEM_STREAM_SIZE, blockSize, *pool, 
nullptr));
+        kind, &memStream, strategy, DEFAULT_MEM_STREAM_SIZE, blockSize, 
blockSize, *pool, nullptr));
 
     // write 3 batches of data and record positions between every batch
     size_t row = 0;
@@ -335,7 +335,7 @@ namespace orc {
     auto inputStream =
         std::make_unique<SeekableArrayInputStream>(memStream.getData(), 
memStream.getLength());
     std::unique_ptr<SeekableInputStream> decompressStream = createDecompressor(
-        kind, std::move(inputStream), blockSize, *pool, 
getDefaultReaderMetrics());
+        kind, std::move(inputStream), DEFAULT_MEM_STREAM_SIZE, *pool, 
getDefaultReaderMetrics());
 
     // prepare positions to seek to
     EXPECT_EQ(rowIndexEntry1.positions_size(), 
rowIndexEntry2.positions_size());
diff --git a/c++/test/TestDictionaryEncoding.cc 
b/c++/test/TestDictionaryEncoding.cc
index f3dcaa006..d2eeb6eb2 100644
--- a/c++/test/TestDictionaryEncoding.cc
+++ b/c++/test/TestDictionaryEncoding.cc
@@ -53,6 +53,7 @@ namespace orc {
 
     WriterOptions options;
     options.setStripeSize(1024);
+    options.setMemoryBlockSize(64);
     options.setCompressionBlockSize(1024);
     options.setCompression(CompressionKind_ZLIB);
     options.setMemoryPool(pool);
@@ -109,6 +110,7 @@ namespace orc {
 
     WriterOptions options;
     options.setStripeSize(1024);
+    options.setMemoryBlockSize(64);
     options.setCompressionBlockSize(1024);
     options.setCompression(CompressionKind_ZLIB);
     options.setMemoryPool(pool);
@@ -171,6 +173,7 @@ namespace orc {
     WriterOptions options;
     options.setStripeSize(1024);
     options.setCompressionBlockSize(1024);
+    options.setMemoryBlockSize(64);
     options.setCompression(CompressionKind_ZLIB);
     options.setMemoryPool(pool);
     options.setDictionaryKeySizeThreshold(threshold);
@@ -233,6 +236,7 @@ namespace orc {
 
     WriterOptions options;
     options.setStripeSize(1024);
+    options.setMemoryBlockSize(64);
     options.setCompressionBlockSize(1024);
     options.setCompression(CompressionKind_ZLIB);
     options.setMemoryPool(pool);
@@ -302,7 +306,8 @@ namespace orc {
 
     WriterOptions options;
     options.setStripeSize(1);
-    options.setCompressionBlockSize(1024);
+    options.setMemoryBlockSize(1024);
+    options.setCompressionBlockSize(2 * 1024);
     options.setCompression(CompressionKind_ZLIB);
     options.setMemoryPool(pool);
     options.setDictionaryKeySizeThreshold(threshold);
diff --git a/c++/test/TestPredicatePushdown.cc 
b/c++/test/TestPredicatePushdown.cc
index e949fc289..5c8ed14e7 100644
--- a/c++/test/TestPredicatePushdown.cc
+++ b/c++/test/TestPredicatePushdown.cc
@@ -33,6 +33,7 @@ namespace orc {
     WriterOptions options;
     options.setStripeSize(1024 * 1024)
         .setCompressionBlockSize(1024)
+        .setMemoryBlockSize(64)
         .setCompression(CompressionKind_NONE)
         .setMemoryPool(pool)
         .setRowIndexStride(rowIndexStride);
@@ -510,6 +511,7 @@ namespace orc {
     WriterOptions options;
     options.setStripeSize(1)
         .setCompressionBlockSize(1024)
+        .setMemoryBlockSize(64)
         .setCompression(CompressionKind_NONE)
         .setMemoryPool(pool)
         .setRowIndexStride(1000);
diff --git a/c++/test/TestReader.cc b/c++/test/TestReader.cc
index f709f693f..33a0481b6 100644
--- a/c++/test/TestReader.cc
+++ b/c++/test/TestReader.cc
@@ -166,6 +166,7 @@ namespace orc {
     WriterOptions options;
     options.setStripeSize(1024 * 1024)
         .setCompressionBlockSize(1024)
+        .setMemoryBlockSize(64)
         .setCompression(CompressionKind_NONE)
         .setMemoryPool(pool)
         .setRowIndexStride(1000);
@@ -310,6 +311,7 @@ namespace orc {
     WriterOptions options;
     options.setStripeSize(1024 * 1024)
         .setCompressionBlockSize(1024)
+        .setMemoryBlockSize(64)
         .setCompression(CompressionKind_NONE)
         .setMemoryPool(pool)
         .setRowIndexStride(1000);
@@ -492,6 +494,7 @@ namespace orc {
     WriterOptions options;
     options.setStripeSize(1024 * 1024)
         .setCompressionBlockSize(1024)
+        .setMemoryBlockSize(64)
         .setCompression(CompressionKind_NONE)
         .setMemoryPool(pool)
         .setRowIndexStride(1000);
@@ -673,6 +676,7 @@ namespace orc {
       WriterOptions options;
       options.setStripeSize(1024 * 1024)
           .setCompressionBlockSize(1024)
+          .setMemoryBlockSize(64)
           .setCompression(CompressionKind_NONE)
           .setMemoryPool(pool)
           .setRowIndexStride(1000);
diff --git a/c++/test/TestWriter.cc b/c++/test/TestWriter.cc
index 0d6b368c0..ad669964a 100644
--- a/c++/test/TestWriter.cc
+++ b/c++/test/TestWriter.cc
@@ -41,11 +41,11 @@ namespace orc {
 
   const int DEFAULT_MEM_STREAM_SIZE = 100 * 1024 * 1024;  // 100M
 
-  std::unique_ptr<Writer> createWriter(uint64_t stripeSize, uint64_t 
compresionblockSize,
-                                       CompressionKind compression, const 
Type& type,
-                                       MemoryPool* memoryPool, OutputStream* 
stream,
-                                       FileVersion version, uint64_t stride = 
0,
-                                       const std::string& timezone = "GMT",
+  std::unique_ptr<Writer> createWriter(uint64_t stripeSize, uint64_t 
memoryBlockSize,
+                                       uint64_t compresionblockSize, 
CompressionKind compression,
+                                       const Type& type, MemoryPool* 
memoryPool,
+                                       OutputStream* stream, FileVersion 
version,
+                                       uint64_t stride = 0, const std::string& 
timezone = "GMT",
                                        bool useTightNumericVector = false) {
     WriterOptions options;
     options.setStripeSize(stripeSize);
@@ -56,6 +56,7 @@ namespace orc {
     options.setFileVersion(version);
     options.setTimezoneName(timezone);
     options.setUseTightNumericVector(useTightNumericVector);
+    options.setMemoryBlockSize(memoryBlockSize);
     return createWriter(type, stream, options);
   }
 
@@ -107,10 +108,11 @@ namespace orc {
 
     uint64_t stripeSize = 16 * 1024;       // 16K
     uint64_t compressionBlockSize = 1024;  // 1k
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     writer->close();
 
     auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), 
memStream.getLength());
@@ -135,10 +137,11 @@ namespace orc {
 
     uint64_t stripeSize = 16 * 1024;       // 16K
     uint64_t compressionBlockSize = 1024;  // 1k
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(1024);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     LongVectorBatch* longBatch = 
dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
@@ -195,10 +198,11 @@ namespace orc {
 
     uint64_t stripeSize = 1024;            // 1K
     uint64_t compressionBlockSize = 1024;  // 1k
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(65535);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     LongVectorBatch* longBatch = 
dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
@@ -241,13 +245,14 @@ namespace orc {
 
     uint64_t stripeSize = 1024;            // 1K
     uint64_t compressionBlockSize = 1024;  // 1k
+    uint64_t memoryBlockSize = 64;
 
     char dataBuffer[327675];
     uint64_t offset = 0;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(65535);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     StringVectorBatch* strBatch = 
dynamic_cast<StringVectorBatch*>(structBatch->fields[0]);
@@ -301,6 +306,7 @@ namespace orc {
     uint64_t stripeSize = 16 * 1024;
     uint64_t compressionBlockSize = 1024;
     uint64_t rowCount = 655350;
+    uint64_t memoryBlockSize = 64;
 
     std::vector<double> data(rowCount);
     for (uint64_t i = 0; i < rowCount; ++i) {
@@ -308,8 +314,8 @@ namespace orc {
     }
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     DoubleVectorBatch* doubleBatch = 
dynamic_cast<DoubleVectorBatch*>(structBatch->fields[0]);
@@ -356,10 +362,11 @@ namespace orc {
     uint64_t stripeSize = 16 * 1024;
     uint64_t compressionBlockSize = 1024;
     uint64_t rowCount = 65535;
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     LongVectorBatch* smallIntBatch = 
dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
@@ -406,10 +413,11 @@ namespace orc {
     uint64_t stripeSize = 16 * 1024;
     uint64_t compressionBlockSize = 1024;
     uint64_t rowCount = 65535;
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     LongVectorBatch* byteBatch = 
dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
@@ -446,10 +454,11 @@ namespace orc {
     uint64_t stripeSize = 16 * 1024;
     uint64_t compressionBlockSize = 1024;
     uint64_t rowCount = 65535;
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     LongVectorBatch* byteBatch = 
dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
@@ -486,10 +495,11 @@ namespace orc {
     uint64_t stripeSize = 16 * 1024;
     uint64_t compressionBlockSize = 1024;
     uint64_t rowCount = 1024;
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
 
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
@@ -527,10 +537,11 @@ namespace orc {
     uint64_t stripeSize = 16 * 1024;
     uint64_t compressionBlockSize = 1024;
     uint64_t rowCount = 102400;
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     TimestampVectorBatch* tsBatch = 
dynamic_cast<TimestampVectorBatch*>(structBatch->fields[0]);
@@ -568,8 +579,8 @@ namespace orc {
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
     MemoryPool* pool = getDefaultPool();
     std::unique_ptr<Type> 
type(Type::buildTypeFromString("struct<a:timestamp>"));
-    auto writer = createWriter(16 * 1024 * 1024, 64 * 1024, 
CompressionKind_ZLIB, *type, pool,
-                               &memStream, fileVersion);
+    auto writer = createWriter(16 * 1024 * 1024, 64 * 1024, 256 * 1024, 
CompressionKind_ZLIB, *type,
+                               pool, &memStream, fileVersion);
     uint64_t batchCount = 5;
     auto batch = writer->createRowBatch(batchCount * 2);
     auto structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
@@ -638,10 +649,11 @@ namespace orc {
     uint64_t stripeSize = 16 * 1024;
     uint64_t compressionBlockSize = 1024;
     uint64_t rowCount = 1;
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion, 0, writerTimezone);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion, 0, writerTimezone);
     auto batch = writer->createRowBatch(rowCount);
     auto& structBatch = dynamic_cast<StructVectorBatch&>(*batch);
     auto& tsBatch = 
dynamic_cast<TimestampVectorBatch&>(*structBatch.fields[0]);
@@ -734,10 +746,11 @@ namespace orc {
     uint64_t stripeSize = 16 * 1024;
     uint64_t compressionBlockSize = 1024;
     uint64_t rowCount = 102400;
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     TimestampVectorBatch* tsBatch = 
dynamic_cast<TimestampVectorBatch*>(structBatch->fields[0]);
@@ -779,13 +792,14 @@ namespace orc {
     uint64_t stripeSize = 1024;
     uint64_t compressionBlockSize = 1024;
     uint64_t rowCount = 65535;
+    uint64_t memoryBlockSize = 64;
 
     char dataBuffer[327675];
     uint64_t offset = 0;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
 
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
@@ -858,10 +872,11 @@ namespace orc {
     uint64_t stripeSize = 16 * 1024;       // 16K
     uint64_t compressionBlockSize = 1024;  // 1k
     uint64_t rowCount = 1024;
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     Decimal64VectorBatch* decBatch = 
dynamic_cast<Decimal64VectorBatch*>(structBatch->fields[0]);
@@ -934,10 +949,11 @@ namespace orc {
     uint64_t stripeSize = 16 * 1024;
     uint64_t compressionBlockSize = 1024;
     uint64_t rowCount = 1024;
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     Decimal128VectorBatch* decBatch = 
dynamic_cast<Decimal128VectorBatch*>(structBatch->fields[0]);
@@ -1022,10 +1038,11 @@ namespace orc {
     uint64_t rowCount = 1024;
     uint64_t maxListLength = 10;
     uint64_t offset = 0;
+    uint64_t memoryBlockSize = 8 * 1024;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount 
* maxListLength);
 
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
@@ -1081,10 +1098,11 @@ namespace orc {
     uint64_t stripeSize = 16 * 1024;
     uint64_t compressionBlockSize = 1024;
     uint64_t rowCount = 1024, maxListLength = 10, offset = 0;
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount 
* maxListLength);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     MapVectorBatch* mapBatch = 
dynamic_cast<MapVectorBatch*>(structBatch->fields[0]);
@@ -1162,10 +1180,11 @@ namespace orc {
     uint64_t stripeSize = 16 * 1024;
     uint64_t compressionBlockSize = 1024;
     uint64_t rowCount = 3333;
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     UnionVectorBatch* unionBatch = 
dynamic_cast<UnionVectorBatch*>(structBatch->fields[0]);
@@ -1257,9 +1276,10 @@ namespace orc {
     uint64_t stripeSize = 1024;
     uint64_t compressionBlockSize = 1024;
     uint64_t rowCount = 3;
+    uint64_t memoryBlockSize = 64;
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = 
writer->createRowBatch(rowCount);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     StringVectorBatch* charBatch = 
dynamic_cast<StringVectorBatch*>(structBatch->fields[0]);
@@ -1326,10 +1346,11 @@ namespace orc {
 
     uint64_t stripeSize = 1024;
     uint64_t compressionBlockSize = 1024;
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
 
     std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(4);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
@@ -1407,10 +1428,11 @@ namespace orc {
 
     uint64_t stripeSize = 1024;
     uint64_t compressionBlockSize = 1024;
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
 
     // test data looks like below -
     // {0}
@@ -1485,12 +1507,13 @@ namespace orc {
 
     uint64_t stripeSize = 1024;
     uint64_t compressionBlockSize = 1024;
+    uint64_t memoryBlockSize = 64;
 
     // 10000 rows with every 1000 row as an RG
     // Each RG has 100 null rows except that the 5th RG is all null
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion, 1000);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion, 1000);
 
     std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(10000);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
@@ -1622,12 +1645,13 @@ namespace orc {
   TEST_P(WriterTest, testBloomFilter) {
     WriterOptions options;
     options.setStripeSize(1024)
-        .setCompressionBlockSize(64)
+        .setCompressionBlockSize(1024)
         .setCompression(CompressionKind_ZSTD)
         .setMemoryPool(getDefaultPool())
         .setRowIndexStride(10000)
         .setFileVersion(fileVersion)
-        .setColumnsUseBloomFilter({1, 2, 3});
+        .setColumnsUseBloomFilter({1, 2, 3})
+        .setMemoryBlockSize(64);
 
     // write 65535 rows of data
     MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
@@ -1716,7 +1740,7 @@ namespace orc {
       auto type = 
std::unique_ptr<Type>(Type::buildTypeFromString("struct<col1:int,col2:int>"));
       WriterOptions options;
       options.setStripeSize(1024 * 1024)
-          .setCompressionBlockSize(1024)
+          .setMemoryBlockSize(1024)
           .setCompression(CompressionKind_NONE)
           .setMemoryPool(pool)
           .setRowIndexStride(1000);
@@ -1809,8 +1833,11 @@ namespace orc {
     uint64_t rowCount = 5000000;
     auto type = 
std::unique_ptr<Type>(Type::buildTypeFromString("struct<c0:int>"));
     WriterOptions options;
-    
options.setStripeSize(1024).setCompressionBlockSize(1024).setCompression(kind).setMemoryPool(
-        pool);
+    options.setStripeSize(1024)
+        .setCompressionBlockSize(1024)
+        .setMemoryBlockSize(64)
+        .setCompression(kind)
+        .setMemoryPool(pool);
 
     auto writer = createWriter(*type, &memStream, options);
     auto batch = writer->createRowBatch(rowCount);
@@ -1853,10 +1880,11 @@ namespace orc {
       WriterOptions options;
       options.setStripeSize(1024 * 1024)
           .setCompressionBlockSize(64 * 1024)
+          .setMemoryBlockSize(1024)
           .setCompression(CompressionKind_NONE)
           .setMemoryPool(pool)
           .setRowIndexStride(1000)
-          .setOutputBufferCapacity(capacity);
+          .setCompressionBlockSize(capacity);
 
       auto writer = createWriter(*type, &memStream, options);
       auto batch = writer->createRowBatch(rowCount);
@@ -1913,6 +1941,7 @@ namespace orc {
     uint64_t stripeSize = 16 * 1024;
     uint64_t compressionBlockSize = 1024;
     uint64_t rowCount = 65530;
+    uint64_t memoryBlockSize = 64;
 
     std::vector<double> data(rowCount);
     for (uint64_t i = 0; i < rowCount; ++i) {
@@ -1920,8 +1949,8 @@ namespace orc {
     }
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion, 0, "GMT", true);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion, 0, "GMT", true);
     // start from here/
     std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(rowCount 
/ 2);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
@@ -2010,10 +2039,11 @@ namespace orc {
 
     uint64_t stripeSize = 1024;            // 1K
     uint64_t compressionBlockSize = 1024;  // 1k
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(65535);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     LongVectorBatch* longBatch = 
dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
@@ -2065,10 +2095,11 @@ namespace orc {
 
     uint64_t stripeSize = 1024;            // 1K
     uint64_t compressionBlockSize = 1024;  // 1k
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion);
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion);
     std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(65535);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     LongVectorBatch* longBatch = 
dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);
@@ -2131,6 +2162,7 @@ namespace orc {
       WriterOptions options;
       options.setStripeSize(16 * 1024)
           .setCompressionBlockSize(1024)
+          .setMemoryBlockSize(64)
           .setCompression(CompressionKind_NONE)
           .setMemoryPool(pool)
           .setRowIndexStride(1000);
@@ -2208,10 +2240,11 @@ namespace orc {
 
     uint64_t stripeSize = 1024;            // 1K
     uint64_t compressionBlockSize = 1024;  // 1k
+    uint64_t memoryBlockSize = 64;
 
     std::unique_ptr<Writer> writer =
-        createWriter(stripeSize, compressionBlockSize, CompressionKind_ZLIB, 
*type, pool,
-                     &memStream, fileVersion, 0, "/ERROR/TIMEZONE");
+        createWriter(stripeSize, memoryBlockSize, compressionBlockSize, 
CompressionKind_ZLIB, *type,
+                     pool, &memStream, fileVersion, 0, "/ERROR/TIMEZONE");
     std::unique_ptr<ColumnVectorBatch> batch = writer->createRowBatch(10);
     StructVectorBatch* structBatch = 
dynamic_cast<StructVectorBatch*>(batch.get());
     LongVectorBatch* longBatch = 
dynamic_cast<LongVectorBatch*>(structBatch->fields[0]);

Reply via email to