This is an automated email from the ASF dual-hosted git repository.

shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 53f65a81d PARQUET-2160: Close ZstdInputStream to free off-heap memory 
in time. (#982)
53f65a81d is described below

commit 53f65a81dc5c2067aa258dfe8d832a6ff9fd0e9c
Author: Yujiang Zhong <[email protected]>
AuthorDate: Thu Sep 22 21:21:31 2022 +0800

    PARQUET-2160: Close ZstdInputStream to free off-heap memory in time. (#982)
    
    * PARQUET-2160: Close ZstdInputStream to free off-heap memory in time.
    
    * Add comment.
---
 .../main/java/org/apache/parquet/hadoop/CodecFactory.java   | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index f0e7af35c..1998ea09d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.codec.ZstandardCodec;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
 public class CodecFactory implements CompressionCodecFactory {
@@ -109,7 +110,17 @@ public class CodecFactory implements 
CompressionCodecFactory {
           decompressor.reset();
         }
         InputStream is = codec.createInputStream(bytes.toInputStream(), 
decompressor);
-        decompressed = BytesInput.from(is, uncompressedSize);
+
+        // We need to explicitly close the ZstdDecompressorStream here to 
release the resources it holds to avoid
+        // off-heap memory fragmentation issue, see 
https://issues.apache.org/jira/browse/PARQUET-2160.
+        // This change will load the decompressor stream into heap a little 
earlier, since the problem it solves
+        // only happens in the ZSTD codec, so this modification is only made 
for ZSTD streams.
+        if (codec instanceof ZstandardCodec) {
+          decompressed = BytesInput.copy(BytesInput.from(is, 
uncompressedSize));
+          is.close();
+        } else {
+          decompressed = BytesInput.from(is, uncompressedSize);
+        }
       } else {
         decompressed = bytes;
       }

Reply via email to