This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new bb190bbc34 ARROW-16473: [Go] fixing memory leak in serializedPageReader
bb190bbc34 is described below

commit bb190bbc345547aefe4aec3e1bc878c645c2cc50
Author: Min-Young Wu <[email protected]>
AuthorDate: Wed May 11 15:55:38 2022 -0400

    ARROW-16473: [Go] fixing memory leak in serializedPageReader
    
    `parquet/file.serializedPageReader` has a 
[memory.Buffer](https://github.com/apache/arrow/blob/8bd5514f52bf9cc542a389edaf697cbc2c97b752/go/parquet/file/page_reader.go#L299)
 attribute (presumably to reuse across page reads). But at the end of 
`serializedPageReader.Next` (in the non-error case), a new `memory.Buffer` is 
[created](https://github.com/apache/arrow/blob/8bd5514f52bf9cc542a389edaf697cbc2c97b752/go/parquet/file/page_reader.go#L615)
 without releasing the pre-existing `p.buf`,  [...]
    
    Existing tests updated to test and catch this (`parquet/file` now uses 
`CheckedAllocator).
    
    Closes #13068 from minyoung/user/minyoung/0504-serialized-page-reader-leak
    
    Authored-by: Min-Young Wu <[email protected]>
    Signed-off-by: Matthew Topol <[email protected]>
---
 go/parquet/file/page_reader.go         | 32 ++++++++++++++++----------------
 go/parquet/pqarrow/file_reader_test.go |  7 ++++---
 2 files changed, 20 insertions(+), 19 deletions(-)

diff --git a/go/parquet/file/page_reader.go b/go/parquet/file/page_reader.go
index 35e2bda087..dc6a1f10ae 100644
--- a/go/parquet/file/page_reader.go
+++ b/go/parquet/file/page_reader.go
@@ -296,7 +296,6 @@ type serializedPageReader struct {
        codec    compress.Codec
 
        curPageHdr        *format.PageHeader
-       buf               *memory.Buffer
        pageOrd           int16
        maxPageHeaderSize int
 
@@ -326,7 +325,6 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, 
compressType compress.
                nrows:             nrows,
                mem:               mem,
                codec:             codec,
-               buf:               memory.NewResizableBuffer(mem),
        }
        rdr.decompressBuffer.Grow(defaultPageHeaderSize)
        if ctx != nil {
@@ -345,7 +343,6 @@ func (p *serializedPageReader) Reset(r 
parquet.BufferedReader, nrows int64, comp
        if p.err != nil {
                return
        }
-       p.buf.ResizeNoShrink(0)
        p.decompressBuffer.Reset()
        if ctx != nil {
                p.cryptoCtx = *ctx
@@ -488,7 +485,9 @@ func (p *serializedPageReader) Next() bool {
                        p.updateDecryption(p.cryptoCtx.DataDecryptor, 
encryption.DictPageModule, p.dataPageAad)
                }
 
-               p.buf.ResizeNoShrink(lenUncompressed)
+               buf := memory.NewResizableBuffer(p.mem)
+               defer buf.Release()
+               buf.ResizeNoShrink(lenUncompressed)
 
                switch p.curPageHdr.GetType() {
                case format.PageType_DICTIONARY_PAGE:
@@ -499,7 +498,7 @@ func (p *serializedPageReader) Next() bool {
                                return false
                        }
 
-                       data, err := p.decompress(lenCompressed, p.buf.Bytes())
+                       data, err := p.decompress(lenCompressed, buf.Bytes())
                        if err != nil {
                                p.err = err
                                return false
@@ -529,7 +528,7 @@ func (p *serializedPageReader) Next() bool {
                        }
 
                        p.rowsSeen += int64(dataHeader.GetNumValues())
-                       data, err := p.decompress(lenCompressed, p.buf.Bytes())
+                       data, err := p.decompress(lenCompressed, buf.Bytes())
                        if err != nil {
                                p.err = err
                                return false
@@ -574,27 +573,30 @@ func (p *serializedPageReader) Next() bool {
                                return false
                        }
 
-                       var data []byte
+                       var pagebuf *memory.Buffer
                        if compressed {
                                if levelsBytelen > 0 {
-                                       io.ReadFull(p.r, 
p.buf.Bytes()[:levelsBytelen])
+                                       io.ReadFull(p.r, 
buf.Bytes()[:levelsBytelen])
                                }
-                               if data, p.err = 
p.decompress(lenCompressed-levelsBytelen, p.buf.Bytes()[levelsBytelen:]); p.err 
!= nil {
+                               var data []byte
+                               if data, p.err = 
p.decompress(lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err 
!= nil {
                                        return false
                                }
+                               pagebuf = memory.NewBufferBytes(data)
                        } else {
-                               io.ReadFull(p.r, p.buf.Bytes())
-                               data = p.buf.Bytes()
+                               io.ReadFull(p.r, buf.Bytes())
+                               pagebuf = buf
+                               pagebuf.Retain()
                        }
-                       if len(data) != lenUncompressed {
-                               p.err = fmt.Errorf("parquet: metadata said %d 
bytes uncompressed data page, got %d bytes", lenUncompressed, len(data))
+                       if pagebuf.Len() != lenUncompressed {
+                               p.err = fmt.Errorf("parquet: metadata said %d 
bytes uncompressed data page, got %d bytes", lenUncompressed, pagebuf.Len())
                                return false
                        }
 
                        // make datapage v2
                        p.curPage = &DataPageV2{
                                page: page{
-                                       buf:      memory.NewBufferBytes(data),
+                                       buf:      pagebuf,
                                        typ:      p.curPageHdr.Type,
                                        nvals:    dataHeader.GetNumValues(),
                                        encoding: dataHeader.GetEncoding(),
@@ -611,8 +613,6 @@ func (p *serializedPageReader) Next() bool {
                        // we don't know this page type, we're allowed to skip 
non-data pages
                        continue
                }
-
-               p.buf = memory.NewResizableBuffer(p.mem)
                return true
        }
 
diff --git a/go/parquet/pqarrow/file_reader_test.go 
b/go/parquet/pqarrow/file_reader_test.go
index d60f21a76e..0a3ed623c9 100644
--- a/go/parquet/pqarrow/file_reader_test.go
+++ b/go/parquet/pqarrow/file_reader_test.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/arrow/go/v9/arrow/array"
        "github.com/apache/arrow/go/v9/arrow/decimal128"
        "github.com/apache/arrow/go/v9/arrow/memory"
+       "github.com/apache/arrow/go/v9/parquet"
        "github.com/apache/arrow/go/v9/parquet/file"
        "github.com/apache/arrow/go/v9/parquet/pqarrow"
        "github.com/stretchr/testify/assert"
@@ -63,7 +64,7 @@ func TestArrowReaderAdHocReadDecimals(t *testing.T) {
                        filename := filepath.Join(dataDir, tt.file+".parquet")
                        require.FileExists(t, filename)
 
-                       rdr, err := file.OpenParquetFile(filename, false)
+                       rdr, err := file.OpenParquetFile(filename, false, 
file.WithReadProps(parquet.NewReaderProperties(mem)))
                        require.NoError(t, err)
                        defer rdr.Close()
                        arrowRdr, err := pqarrow.NewFileReader(rdr, 
pqarrow.ArrowReadProperties{}, mem)
@@ -107,7 +108,7 @@ func TestRecordReaderParallel(t *testing.T) {
        var buf bytes.Buffer
        require.NoError(t, pqarrow.WriteTable(tbl, &buf, tbl.NumRows(), nil, 
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))))
 
-       pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+       pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), 
file.WithReadProps(parquet.NewReaderProperties(mem)))
        require.NoError(t, err)
 
        reader, err := pqarrow.NewFileReader(pf, 
pqarrow.ArrowReadProperties{BatchSize: 3, Parallel: true}, mem)
@@ -153,7 +154,7 @@ func TestRecordReaderSerial(t *testing.T) {
        var buf bytes.Buffer
        require.NoError(t, pqarrow.WriteTable(tbl, &buf, tbl.NumRows(), nil, 
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))))
 
-       pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
+       pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), 
file.WithReadProps(parquet.NewReaderProperties(mem)))
        require.NoError(t, err)
 
        reader, err := pqarrow.NewFileReader(pf, 
pqarrow.ArrowReadProperties{BatchSize: 2}, mem)

Reply via email to