This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new f799893 PARQUET-1533: TestSnappy() throws OOM exception with
Parquet-1485 change (#622)
f799893 is described below
commit f7998934020ea6f4949e347616431219343d8a15
Author: Gabor Szadovszky <[email protected]>
AuthorDate: Mon Feb 25 13:23:32 2019 +0100
PARQUET-1533: TestSnappy() throws OOM exception with Parquet-1485 change
(#622)
---
.../parquet/hadoop/codec/SnappyCompressor.java | 26 ++++------------------
.../parquet/hadoop/codec/SnappyDecompressor.java | 26 ++++------------------
2 files changed, 8 insertions(+), 44 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
index b2a8e7f..4720c08 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyCompressor.java
@@ -32,23 +32,16 @@ import org.apache.parquet.Preconditions;
* entire input in setInput and compresses it as one compressed block.
*/
public class SnappyCompressor implements Compressor {
- private static final int initialBufferSize = 64 * 1024 * 1024;
-
// Buffer for compressed output. This buffer grows as necessary.
- private ByteBuffer outputBuffer =
ByteBuffer.allocateDirect(initialBufferSize);
+ private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
// Buffer for uncompressed input. This buffer grows as necessary.
- private ByteBuffer inputBuffer =
ByteBuffer.allocateDirect(initialBufferSize);
+ private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
private long bytesRead = 0L;
private long bytesWritten = 0L;
private boolean finishCalled = false;
- public SnappyCompressor() {
- inputBuffer.limit(0);
- outputBuffer.limit(0);
- }
-
/**
* Fills specified buffer with compressed data. Returns actual number
* of bytes of compressed data. A return value of 0 indicates that
@@ -120,7 +113,8 @@ public class SnappyCompressor implements Compressor {
@Override
public void end() {
- // No-op
+ CleanUtil.clean(inputBuffer);
+ CleanUtil.clean(outputBuffer);
}
@Override
@@ -157,18 +151,6 @@ public class SnappyCompressor implements Compressor {
@Override
public synchronized void reset() {
- if (inputBuffer.capacity() > initialBufferSize) {
- ByteBuffer oldBuffer = inputBuffer;
- inputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
- CleanUtil.clean(oldBuffer);
- }
-
- if (outputBuffer.capacity() > initialBufferSize) {
- ByteBuffer oldBuffer = outputBuffer;
- outputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
- CleanUtil.clean(oldBuffer);
- }
-
finishCalled = false;
bytesRead = bytesWritten = 0;
inputBuffer.rewind();
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
index 8a7f86d..c3da63f 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/SnappyDecompressor.java
@@ -27,21 +27,14 @@ import org.xerial.snappy.Snappy;
import org.apache.parquet.Preconditions;
public class SnappyDecompressor implements Decompressor {
- private static final int initialBufferSize = 64 * 1024 * 1024;
-
// Buffer for uncompressed output. This buffer grows as necessary.
- private ByteBuffer outputBuffer =
ByteBuffer.allocateDirect(initialBufferSize);
+ private ByteBuffer outputBuffer = ByteBuffer.allocateDirect(0);
// Buffer for compressed input. This buffer grows as necessary.
- private ByteBuffer inputBuffer =
ByteBuffer.allocateDirect(initialBufferSize);
+ private ByteBuffer inputBuffer = ByteBuffer.allocateDirect(0);
private boolean finished;
- public SnappyDecompressor() {
- inputBuffer.limit(0);
- outputBuffer.limit(0);
- }
-
/**
* Fills specified buffer with uncompressed data. Returns actual number
* of bytes of uncompressed data. A return value of 0 indicates that
@@ -122,7 +115,8 @@ public class SnappyDecompressor implements Decompressor {
@Override
public void end() {
- // No-op
+ CleanUtil.clean(inputBuffer);
+ CleanUtil.clean(outputBuffer);
}
@Override
@@ -142,18 +136,6 @@ public class SnappyDecompressor implements Decompressor {
@Override
public synchronized void reset() {
- if (inputBuffer.capacity() > initialBufferSize) {
- ByteBuffer oldBuffer = inputBuffer;
- inputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
- CleanUtil.clean(oldBuffer);
- }
-
- if (outputBuffer.capacity() > initialBufferSize) {
- ByteBuffer oldBuffer = outputBuffer;
- outputBuffer = ByteBuffer.allocateDirect(initialBufferSize);
- CleanUtil.clean(oldBuffer);
- }
-
finished = false;
inputBuffer.rewind();
outputBuffer.rewind();