This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 667d427d8e ARROW-16163: [Go] IPC FileReader leaks memory when used
with ZSTD compression
667d427d8e is described below
commit 667d427d8ef16b1535dc82ce6fe6b7384382231c
Author: Nicolas Moreau <[email protected]>
AuthorDate: Wed Apr 13 13:15:37 2022 -0400
ARROW-16163: [Go] IPC FileReader leaks memory when used with ZSTD
compression
When used in its stream version (calling .Read()), the ZSTD decoder makes
use of a Goroutine with a channel to receive input. This channel is closed by
the decoder's .Close() function, which ends the Goroutine and therefore the
collection of its memory by the GC.
Here, we add Close() to the exposed decompressor interface and call its
close at the end of the function that uses it.
Closes #12857 from cptjacky/ARROW-16163
Authored-by: Nicolas Moreau <[email protected]>
Signed-off-by: Matthew Topol <[email protected]>
---
go/arrow/ipc/compression.go | 13 ++++++++++++-
go/arrow/ipc/file_reader.go | 1 +
2 files changed, 13 insertions(+), 1 deletion(-)
diff --git a/go/arrow/ipc/compression.go b/go/arrow/ipc/compression.go
index 352f8ed92c..3f1e00a123 100644
--- a/go/arrow/ipc/compression.go
+++ b/go/arrow/ipc/compression.go
@@ -82,6 +82,7 @@ func getCompressor(codec flatbuf.CompressionType) compressor {
type decompressor interface {
io.Reader
Reset(io.Reader)
+ Close()
}
type zstdDecompressor struct {
@@ -94,10 +95,20 @@ func (z *zstdDecompressor) Reset(r io.Reader) {
}
}
+func (z *zstdDecompressor) Close() {
+ z.Decoder.Close()
+}
+
+type lz4Decompressor struct {
+ *lz4.Reader
+}
+
+func (z *lz4Decompressor) Close() {}
+
func getDecompressor(codec flatbuf.CompressionType) decompressor {
switch codec {
case flatbuf.CompressionTypeLZ4_FRAME:
- return lz4.NewReader(nil)
+ return &lz4Decompressor{lz4.NewReader(nil)}
case flatbuf.CompressionTypeZSTD:
dec, err := zstd.NewReader(nil)
if err != nil {
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index 3734fd6781..98cf7ba997 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -326,6 +326,7 @@ func newRecord(schema *arrow.Schema, meta *memory.Buffer,
body ReadAtSeeker, mem
bodyCompress := md.Compression(nil)
if bodyCompress != nil {
codec = getDecompressor(bodyCompress.Codec())
+ defer codec.Close()
}
ctx := &arrayLoaderContext{