This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 667d427d8e ARROW-16163: [Go] IPC FileReader leaks memory when used 
with ZSTD compression
667d427d8e is described below

commit 667d427d8ef16b1535dc82ce6fe6b7384382231c
Author: Nicolas Moreau <[email protected]>
AuthorDate: Wed Apr 13 13:15:37 2022 -0400

    ARROW-16163: [Go] IPC FileReader leaks memory when used with ZSTD 
compression
    
    When used in its stream version (calling .Read()), the ZSTD decoder makes 
use of a Goroutine with a channel to receive input. This channel is closed by 
the decoder's .Close() function, which ends the Goroutine and therefore the 
collection of its memory by the GC.
    
    Here, we add Close() to the exposed decompressor interface and call its 
close at the end of the function that uses it.
    
    Closes #12857 from cptjacky/ARROW-16163
    
    Authored-by: Nicolas Moreau <[email protected]>
    Signed-off-by: Matthew Topol <[email protected]>
---
 go/arrow/ipc/compression.go | 13 ++++++++++++-
 go/arrow/ipc/file_reader.go |  1 +
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/go/arrow/ipc/compression.go b/go/arrow/ipc/compression.go
index 352f8ed92c..3f1e00a123 100644
--- a/go/arrow/ipc/compression.go
+++ b/go/arrow/ipc/compression.go
@@ -82,6 +82,7 @@ func getCompressor(codec flatbuf.CompressionType) compressor {
 type decompressor interface {
        io.Reader
        Reset(io.Reader)
+       Close()
 }
 
 type zstdDecompressor struct {
@@ -94,10 +95,20 @@ func (z *zstdDecompressor) Reset(r io.Reader) {
        }
 }
 
+func (z *zstdDecompressor) Close() {
+       z.Decoder.Close()
+}
+
+type lz4Decompressor struct {
+       *lz4.Reader
+}
+
+func (z *lz4Decompressor) Close() {}
+
 func getDecompressor(codec flatbuf.CompressionType) decompressor {
        switch codec {
        case flatbuf.CompressionTypeLZ4_FRAME:
-               return lz4.NewReader(nil)
+               return &lz4Decompressor{lz4.NewReader(nil)}
        case flatbuf.CompressionTypeZSTD:
                dec, err := zstd.NewReader(nil)
                if err != nil {
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index 3734fd6781..98cf7ba997 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -326,6 +326,7 @@ func newRecord(schema *arrow.Schema, meta *memory.Buffer, 
body ReadAtSeeker, mem
        bodyCompress := md.Compression(nil)
        if bodyCompress != nil {
                codec = getDecompressor(bodyCompress.Codec())
+               defer codec.Close()
        }
 
        ctx := &arrayLoaderContext{

Reply via email to