[
https://issues.apache.org/jira/browse/PARQUET-2160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17582605#comment-17582605
]
ASF GitHub Bot commented on PARQUET-2160:
-----------------------------------------
shangxinli commented on code in PR #982:
URL: https://github.com/apache/parquet-mr/pull/982#discussion_r950883464
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java:
##########
@@ -109,7 +110,17 @@ public BytesInput decompress(BytesInput bytes, int
uncompressedSize) throws IOEx
decompressor.reset();
}
InputStream is = codec.createInputStream(bytes.toInputStream(),
decompressor);
- decompressed = BytesInput.from(is, uncompressedSize);
+
+ // We need to explicitly close the ZstdDecompressorStream here to
release the resources it holds to avoid
+ // off-heap memory fragmentation issue, see
https://issues.apache.org/jira/browse/PARQUET-2160.
+ // This change will load the decompressor stream into heap a little
earlier, since the problem it solves
+ // only happens in the ZSTD codec, so this modification is only made
for ZSTD streams.
+ if (codec instanceof ZstandardCodec) {
+ decompressed = BytesInput.copy(BytesInput.from(is,
uncompressedSize));
Review Comment:
I understand we had the discussion in the Jira that ByteInput.copy() just
loads into a heap in advance but not add extra overall. Can we have a benchmark
on the heap/GC(Heap size, GC time etc). I just want to make sure we fix one
problem while introducing another problem.
Other than that, the ZSTD is treated especially might be OK since we had
pretty decent coments.
> Close decompression stream to free off-heap memory in time
> ----------------------------------------------------------
>
> Key: PARQUET-2160
> URL: https://issues.apache.org/jira/browse/PARQUET-2160
> Project: Parquet
> Issue Type: Improvement
> Environment: Spark 3.1.2 + Iceberg 0.12 + Parquet 1.12.3 + zstd-jni
> 1.4.9.1 + glibc
> Reporter: Yujiang Zhong
> Priority: Major
>
> The decompressed stream in HeapBytesDecompressor$decompress now relies on the
> JVM GC to close. When reading parquet in zstd compressed format, sometimes I
> ran into OOM cause high off-heap usage. I think the reason is that the GC is
> not timely and causes off-heap memory fragmentation. I had to set lower
> MALLOC_TRIM_THRESHOLD_ to make glibc give back memory to system quickly.
> There is a
> [thread|[https://apache-iceberg.slack.com/archives/C025PH0G1D4/p1650928750269869?thread_ts=1650927062.590789&cid=C025PH0G1D4]]
> of this zstd parquet issus in Iceberg community slack: some people had the
> same problem.
> I think maybe we can use ByteArrayBytesInput as decompressed bytes input and
> close decompressed stream in time to solve this problem:
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.from(is, uncompressedSize); {code}
> ->
> {code:java}
> InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
> decompressed = BytesInput.copy(BytesInput.from(is, uncompressedSize));
> is.close(); {code}
> After I made this change to decompress, I found off-heap memory is
> significantly reduced (with same query on same data).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)