This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new a89083caf PARQUET-2429: Reduce direct input buffer churn (#1270)
a89083caf is described below
commit a89083caff1ce84d21ddf7dbd3d4ad299bc584b3
Author: Gian Merlino <[email protected]>
AuthorDate: Mon Apr 22 22:05:24 2024 -0700
PARQUET-2429: Reduce direct input buffer churn (#1270)
Currently input buffers are grown one chunk at a time as the
compressor or decompressor receives successive setInput calls. When
decompressing a 64MB block using a 4KB chunk size, this leads to
thousands of allocations and deallocations totaling GBs of memory.
---
.../parquet/hadoop/codec/NonBlockedCompressor.java | 19 ++++++++++++++++++-
.../parquet/hadoop/codec/NonBlockedDecompressor.java | 19 ++++++++++++++++++-
2 files changed, 36 insertions(+), 2 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
index 24d453028..60a5dba6d 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedCompressor.java
@@ -30,6 +30,15 @@ import org.apache.parquet.Preconditions;
*/
public abstract class NonBlockedCompressor implements Compressor {
+ private static final int INITIAL_INPUT_BUFFER_SIZE = 4096;
+
+ /**
+ * Input buffer starts at {@link #INITIAL_INPUT_BUFFER_SIZE} and then grows
by this factor every time it needs
+ * additional space. This factor is chosen to balance the time to reach the
target size against the excess peak
+ * memory usage due to overshooting the target.
+ */
+ private static final double INPUT_BUFFER_GROWTH_FACTOR = 1.2;
+
// Buffer for compressed output. This buffer grows as necessary.
private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
@@ -100,7 +109,15 @@ public abstract class NonBlockedCompressor implements
Compressor {
!outputBuffer.hasRemaining(), "Output buffer should be empty. Caller
must call compress()");
if (inputBuffer.capacity() - inputBuffer.position() < len) {
- ByteBuffer tmp = ByteBuffer.allocateDirect(inputBuffer.position() + len);
+ final int newBufferSize;
+ if (inputBuffer.capacity() == 0) {
+ newBufferSize = Math.max(INITIAL_INPUT_BUFFER_SIZE, len);
+ } else {
+ newBufferSize = Math.max(
+ inputBuffer.position() + len, (int) (inputBuffer.capacity() *
INPUT_BUFFER_GROWTH_FACTOR));
+ }
+ ByteBuffer tmp = ByteBuffer.allocateDirect(newBufferSize);
+ tmp.limit(inputBuffer.position() + len);
inputBuffer.rewind();
tmp.put(inputBuffer);
ByteBuffer oldBuffer = inputBuffer;
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
index 7973a7cc7..95109aee7 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/NonBlockedDecompressor.java
@@ -25,6 +25,15 @@ import org.apache.parquet.Preconditions;
public abstract class NonBlockedDecompressor implements Decompressor {
+ private static final int INITIAL_INPUT_BUFFER_SIZE = 4096;
+
+ /**
+ * Input buffer starts at {@link #INITIAL_INPUT_BUFFER_SIZE} and then grows
by this factor every time it needs
+ * additional space. This factor is chosen to balance the time to reach the
target size against the excess peak
+ * memory usage due to overshooting the target.
+ */
+ private static final double INPUT_BUFFER_GROWTH_FACTOR = 1.2;
+
// Buffer for uncompressed output. This buffer grows as necessary.
private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
@@ -105,7 +114,15 @@ public abstract class NonBlockedDecompressor implements
Decompressor {
SnappyUtil.validateBuffer(buffer, off, len);
if (inputBuffer.capacity() - inputBuffer.position() < len) {
- final ByteBuffer newBuffer =
ByteBuffer.allocateDirect(inputBuffer.position() + len);
+ final int newBufferSize;
+ if (inputBuffer.capacity() == 0) {
+ newBufferSize = Math.max(INITIAL_INPUT_BUFFER_SIZE, len);
+ } else {
+ newBufferSize = Math.max(
+ inputBuffer.position() + len, (int) (inputBuffer.capacity() *
INPUT_BUFFER_GROWTH_FACTOR));
+ }
+ final ByteBuffer newBuffer = ByteBuffer.allocateDirect(newBufferSize);
+ newBuffer.limit(inputBuffer.position() + len);
inputBuffer.rewind();
newBuffer.put(inputBuffer);
final ByteBuffer oldBuffer = inputBuffer;