This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new a89083caf PARQUET-2429: Reduce direct input buffer churn (#1270)
a89083caf is described below

commit a89083caff1ce84d21ddf7dbd3d4ad299bc584b3
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Apr 22 22:05:24 2024 -0700

    PARQUET-2429: Reduce direct input buffer churn (#1270)
    
    Currently input buffers are grown one chunk at a time as the
    compressor or decompressor receives successive setInput calls. When
    decompressing a 64MB block using a 4KB chunk size, this leads to
    thousands of allocations and deallocations totaling GBs of memory.
---
 .../parquet/hadoop/codec/NonBlockedCompressor.java    | 19 ++++++++++++++++++-
 .../parquet/hadoop/codec/NonBlockedDecompressor.java  | 19 ++++++++++++++++++-
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
index 24d453028..60a5dba6d 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
@@ -30,6 +30,15 @@ import org.apache.parquet.Preconditions;
  */
 public abstract class NonBlockedCompressor implements Compressor {
 
+  private static final int INITIAL_INPUT_BUFFER_SIZE = 4096;
+
+  /**
+   * Input buffer starts at {@link #INITIAL_INPUT_BUFFER_SIZE} and then grows 
by this factor every time it needs
+   * additional space. This factor is chosen to balance the time to reach the 
target size against the excess peak
+   * memory usage due to overshooting the target.
+   */
+  private static final double INPUT_BUFFER_GROWTH_FACTOR = 1.2;
+
   // Buffer for compressed output. This buffer grows as necessary.
   private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
 
@@ -100,7 +109,15 @@ public abstract class NonBlockedCompressor implements 
Compressor {
         !outputBuffer.hasRemaining(), "Output buffer should be empty. Caller 
must call compress()");
 
     if (inputBuffer.capacity() - inputBuffer.position() < len) {
-      ByteBuffer tmp = ByteBuffer.allocateDirect(inputBuffer.position() + len);
+      final int newBufferSize;
+      if (inputBuffer.capacity() == 0) {
+        newBufferSize = Math.max(INITIAL_INPUT_BUFFER_SIZE, len);
+      } else {
+        newBufferSize = Math.max(
+            inputBuffer.position() + len, (int) (inputBuffer.capacity() * 
INPUT_BUFFER_GROWTH_FACTOR));
+      }
+      ByteBuffer tmp = ByteBuffer.allocateDirect(newBufferSize);
+      tmp.limit(inputBuffer.position() + len);
       inputBuffer.rewind();
       tmp.put(inputBuffer);
       ByteBuffer oldBuffer = inputBuffer;
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
index 7973a7cc7..95109aee7 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
@@ -25,6 +25,15 @@ import org.apache.parquet.Preconditions;
 
 public abstract class NonBlockedDecompressor implements Decompressor {
 
+  private static final int INITIAL_INPUT_BUFFER_SIZE = 4096;
+
+  /**
+   * Input buffer starts at {@link #INITIAL_INPUT_BUFFER_SIZE} and then grows 
by this factor every time it needs
+   * additional space. This factor is chosen to balance the time to reach the 
target size against the excess peak
+   * memory usage due to overshooting the target.
+   */
+  private static final double INPUT_BUFFER_GROWTH_FACTOR = 1.2;
+
   // Buffer for uncompressed output. This buffer grows as necessary.
   private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
 
@@ -105,7 +114,15 @@ public abstract class NonBlockedDecompressor implements 
Decompressor {
     SnappyUtil.validateBuffer(buffer, off, len);
 
     if (inputBuffer.capacity() - inputBuffer.position() < len) {
-      final ByteBuffer newBuffer = 
ByteBuffer.allocateDirect(inputBuffer.position() + len);
+      final int newBufferSize;
+      if (inputBuffer.capacity() == 0) {
+        newBufferSize = Math.max(INITIAL_INPUT_BUFFER_SIZE, len);
+      } else {
+        newBufferSize = Math.max(
+            inputBuffer.position() + len, (int) (inputBuffer.capacity() * 
INPUT_BUFFER_GROWTH_FACTOR));
+      }
+      final ByteBuffer newBuffer = ByteBuffer.allocateDirect(newBufferSize);
+      newBuffer.limit(inputBuffer.position() + len);
       inputBuffer.rewind();
       newBuffer.put(inputBuffer);
       final ByteBuffer oldBuffer = inputBuffer;

Reply via email to