This is an automated email from the ASF dual-hosted git repository.
shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 53f65a81d PARQUET-2160: Close ZstdInputStream to free off-heap memory
in time. (#982)
53f65a81d is described below
commit 53f65a81dc5c2067aa258dfe8d832a6ff9fd0e9c
Author: Yujiang Zhong <[email protected]>
AuthorDate: Thu Sep 22 21:21:31 2022 +0800
PARQUET-2160: Close ZstdInputStream to free off-heap memory in time. (#982)
* PARQUET-2160: Close ZstdInputStream to free off-heap memory in time.
* Add comment.
---
.../main/java/org/apache/parquet/hadoop/CodecFactory.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index f0e7af35c..1998ea09d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.codec.ZstandardCodec;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
public class CodecFactory implements CompressionCodecFactory {
@@ -109,7 +110,17 @@ public class CodecFactory implements
CompressionCodecFactory {
decompressor.reset();
}
InputStream is = codec.createInputStream(bytes.toInputStream(),
decompressor);
- decompressed = BytesInput.from(is, uncompressedSize);
+
+ // We need to explicitly close the ZstdDecompressorStream here to
release the resources it holds to avoid
+ // off-heap memory fragmentation issue, see
https://issues.apache.org/jira/browse/PARQUET-2160.
+ // This change will load the decompressor stream into heap a little
earlier, since the problem it solves
+ // only happens in the ZSTD codec, so this modification is only made
for ZSTD streams.
+ if (codec instanceof ZstandardCodec) {
+ decompressed = BytesInput.copy(BytesInput.from(is,
uncompressedSize));
+ is.close();
+ } else {
+ decompressed = BytesInput.from(is, uncompressedSize);
+ }
} else {
decompressed = bytes;
}