This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new f0b6fd9e feat(parquet): utilize memory allocator in 
`serializedPageReader` (#485)
f0b6fd9e is described below

commit f0b6fd9eacfd244cdef200a6115873e8279f4297
Author: Ruihao Chen <[email protected]>
AuthorDate: Tue Sep 9 04:26:54 2025 +0800

    feat(parquet): utilize memory allocator in `serializedPageReader` (#485)
    
    ### Rationale for this change
    
    Previously, only data page v2 in the reader will use the memory
    allocated from allocator. And seems there exists some misuage of the
    allocator.
    
    ### What changes are included in this PR?
    
    Use memory allocator to allocate buffer for `serializedPageReader`, and
    introduce `Close` function to make sure all the memory will be freed
    after usage.
    
    ### Are these changes tested?
    
    Track the memory usage in existing test to ensure all the allocated
    memory are freed.
    
    ### Are there any user-facing changes?
    
    For user with no explicitly set allocators, they don't need to change
    anything. Otherwise, they should remember to call
    `ColumnChunkReader.Close` after reading after chunk.
    
    ---------
    
    Signed-off-by: Ruihao Chen <[email protected]>
    Signed-off-by: Ruihao Chen <[email protected]>
---
 parquet/file/column_reader.go                     |  93 +++++++++++++-----
 parquet/file/column_reader_types.gen.go           | 110 ++++++++++++++++++----
 parquet/file/column_reader_types.gen.go.tmpl      |  31 +++++-
 parquet/file/column_writer_test.go                |  30 ++++++
 parquet/file/file_reader_test.go                  |  17 +++-
 parquet/file/file_writer_test.go                  |   1 +
 parquet/file/page_reader.go                       |  62 +++++++-----
 parquet/file/record_reader.go                     |   1 +
 parquet/internal/encoding/physical_types.tmpldata |  24 +++--
 parquet/internal/testutils/pagebuilder.go         |   4 +
 parquet/internal/testutils/primitive_typed.go     |  39 +++++++-
 parquet/pqarrow/encode_dictionary_test.go         |   1 +
 12 files changed, 336 insertions(+), 77 deletions(-)

diff --git a/parquet/file/column_reader.go b/parquet/file/column_reader.go
index 03ca5a8f..5597689b 100644
--- a/parquet/file/column_reader.go
+++ b/parquet/file/column_reader.go
@@ -38,6 +38,23 @@ const (
        defaultPageHeaderSize = 16 * 1024
 )
 
+// cloneByteArray is a helper function to clone a slice of byte slices
+func cloneByteArray[T ~[]byte](src []T) {
+       totalLength := 0
+       for i := range src {
+               totalLength += len(src[i])
+       }
+
+       buf := make([]byte, totalLength)
+       pos := 0
+       for i := range src {
+               srcLen := len(src[i])
+               copy(buf[pos:pos+srcLen], src[i])
+               src[i] = T(buf[pos : pos+srcLen])
+               pos += srcLen
+       }
+}
+
 //go:generate go run ../../arrow/_tools/tmpl/main.go -i 
-data=../internal/encoding/physical_types.tmpldata 
column_reader_types.gen.go.tmpl
 
 func isDictIndexEncoding(e format.Encoding) bool {
@@ -113,6 +130,8 @@ type ColumnChunkReader interface {
        // automatically read the first page of the page reader passed in until
        // HasNext which will read in the next page.
        setPageReader(PageReader)
+       // Close releases the resources held by the column reader.
+       Close() error
 }
 
 type columnChunkReader struct {
@@ -221,12 +240,24 @@ func (c *columnChunkReader) consumeBufferedValues(n 
int64) { c.numDecoded += n }
 func (c *columnChunkReader) numAvailValues() int64         { return 
c.numBuffered - c.numDecoded }
 func (c *columnChunkReader) pager() PageReader             { return c.rdr }
 func (c *columnChunkReader) setPageReader(rdr PageReader) {
+       c.Close()
        c.rdr, c.err = rdr, nil
        c.decoders = make(map[format.Encoding]encoding.TypedDecoder)
        c.newDictionary = false
        c.numBuffered, c.numDecoded = 0, 0
 }
 
+// Close closes the page raeder and the page if set.
+func (c *columnChunkReader) Close() error {
+       if c.curPage != nil {
+               c.curPage.Release()
+       }
+       if c.rdr != nil {
+               return c.rdr.Close()
+       }
+       return nil
+}
+
 func (c *columnChunkReader) getDefLvlBuffer(sz int64) []int16 {
        if int64(len(c.defLvlBuffer)) < sz {
                c.defLvlBuffer = make([]int16, sz)
@@ -340,6 +371,10 @@ func (c *columnChunkReader) readNewPage() bool {
                        return true
                }
        }
+
+       // If we get here, we're at the end of the column, and the page must
+       // have already been released. So set it to nil to avoid releasing 
twice.
+       c.curPage = nil
        c.err = c.rdr.Err()
        return false
 }
@@ -632,19 +667,18 @@ func (c *columnChunkReader) skipRows(nrows int64) error {
 
 type readerFunc func(int64, int64) (int, error)
 
-// base function for reading a batch of values, this will read until it either 
reads in batchSize values or
-// it hits the end of the column chunk, including reading multiple pages.
+// readBatch is base function for reading a batch of values, this will read 
until it either reads
+// in batchSize values or it hits the end of the column chunk, including 
reading multiple pages.
 //
-// totalValues is the total number of values which were read in, and thus 
would be the total number
+// totalLvls is the total number of values which were read in, and thus would 
be the total number
 // of definition levels and repetition levels which were populated (if they 
were non-nil). totalRead
 // is the number of physical values that were read in (ie: the number of 
non-null values)
 func (c *columnChunkReader) readBatch(batchSize int64, defLvls, repLvls 
[]int16, readFn readerFunc) (totalLvls int64, totalRead int, err error) {
        var (
-               read   int
-               defs   []int16
-               reps   []int16
-               ndefs  int
-               toRead int64
+               defs []int16
+               reps []int16
+               lvls int64
+               read int
        )
 
        for totalLvls < batchSize && c.HasNext() && err == nil {
@@ -654,22 +688,35 @@ func (c *columnChunkReader) readBatch(batchSize int64, 
defLvls, repLvls []int16,
                if repLvls != nil {
                        reps = repLvls[totalLvls:]
                }
-               ndefs, toRead, err = c.determineNumToRead(batchSize-totalLvls, 
defs, reps)
-               if err != nil {
-                       return totalLvls, totalRead, err
-               }
-
-               read, err = readFn(int64(totalRead), toRead)
-               // the total number of values processed here is the maximum of
-               // the number of definition levels or the number of physical 
values read.
-               // if this is a required field, ndefs will be 0 since there is 
no definition
-               // levels stored with it and `read` will be the number of 
values, otherwise
-               // we use ndefs since it will be equal to or greater than read.
-               totalVals := int64(utils.Max(ndefs, read))
-               c.consumeBufferedValues(totalVals)
-
-               totalLvls += totalVals
+               lvls, read, err = c.readBatchInPage(batchSize-totalLvls, 
int64(totalRead), defs, reps, readFn)
+               totalLvls += lvls
                totalRead += read
        }
        return totalLvls, totalRead, err
 }
+
+// readBatchInPage is a helper function for reading a batch of values. This 
function ensures
+// the read values are from the same page.
+//
+// TotalRead is the start index to pass to readFn.
+func (c *columnChunkReader) readBatchInPage(batchSize int64, totalRead int64, 
defLvls, repLvls []int16, readFn readerFunc) (lvls int64, read int, err error) {
+       if !c.HasNext() {
+               return 0, 0, c.err
+       }
+
+       ndefs, toRead, err := c.determineNumToRead(batchSize, defLvls, repLvls)
+       if err != nil {
+               return 0, 0, err
+       }
+
+       read, err = readFn(totalRead, toRead)
+       // the total number of values processed here is the maximum of
+       // the number of definition levels or the number of physical values 
read.
+       // if this is a required field, ndefs will be 0 since there is no 
definition
+       // levels stored with it and `read` will be the number of values, 
otherwise
+       // we use ndefs since it will be equal to or greater than read.
+       lvls = int64(utils.Max(ndefs, read))
+       c.consumeBufferedValues(lvls)
+
+       return lvls, read, err
+}
diff --git a/parquet/file/column_reader_types.gen.go 
b/parquet/file/column_reader_types.gen.go
index a60cf8e3..fe3e20e5 100644
--- a/parquet/file/column_reader_types.gen.go
+++ b/parquet/file/column_reader_types.gen.go
@@ -36,8 +36,10 @@ func (cr *Int32ColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
        return nvalues, err
 }
 
-// ReadBatch reads batchSize values from the column.
-//
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the 
column, currently they are
+// used in test. The former one only reads values from the same page, while 
the latter one reads across
+// multiple pages to fill the batch.
+
 // Returns error if values is not at least big enough to hold the number of 
values that will be read.
 //
 // defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
@@ -45,6 +47,12 @@ func (cr *Int32ColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
 //
 // total is the number of rows that were read, valuesRead is the actual number 
of physical values
 // that were read excluding nulls
+func (cr *Int32ColumnChunkReader) ReadBatchInPage(batchSize int64, values 
[]int32, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+       return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start, 
len int64) (int, error) {
+               return 
cr.curDecoder.(encoding.Int32Decoder).Decode(values[start : start+len])
+       })
+}
+
 func (cr *Int32ColumnChunkReader) ReadBatch(batchSize int64, values []int32, 
defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
        return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
                return 
cr.curDecoder.(encoding.Int32Decoder).Decode(values[start : start+len])
@@ -64,8 +72,10 @@ func (cr *Int64ColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
        return nvalues, err
 }
 
-// ReadBatch reads batchSize values from the column.
-//
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the 
column, currently they are
+// used in test. The former one only reads values from the same page, while 
the latter one reads across
+// multiple pages to fill the batch.
+
 // Returns error if values is not at least big enough to hold the number of 
values that will be read.
 //
 // defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
@@ -73,6 +83,12 @@ func (cr *Int64ColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
 //
 // total is the number of rows that were read, valuesRead is the actual number 
of physical values
 // that were read excluding nulls
+func (cr *Int64ColumnChunkReader) ReadBatchInPage(batchSize int64, values 
[]int64, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+       return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start, 
len int64) (int, error) {
+               return 
cr.curDecoder.(encoding.Int64Decoder).Decode(values[start : start+len])
+       })
+}
+
 func (cr *Int64ColumnChunkReader) ReadBatch(batchSize int64, values []int64, 
defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
        return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
                return 
cr.curDecoder.(encoding.Int64Decoder).Decode(values[start : start+len])
@@ -92,8 +108,10 @@ func (cr *Int96ColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
        return nvalues, err
 }
 
-// ReadBatch reads batchSize values from the column.
-//
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the 
column, currently they are
+// used in test. The former one only reads values from the same page, while 
the latter one reads across
+// multiple pages to fill the batch.
+
 // Returns error if values is not at least big enough to hold the number of 
values that will be read.
 //
 // defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
@@ -101,6 +119,12 @@ func (cr *Int96ColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
 //
 // total is the number of rows that were read, valuesRead is the actual number 
of physical values
 // that were read excluding nulls
+func (cr *Int96ColumnChunkReader) ReadBatchInPage(batchSize int64, values 
[]parquet.Int96, defLvls, repLvls []int16) (total int64, valuesRead int, err 
error) {
+       return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start, 
len int64) (int, error) {
+               return 
cr.curDecoder.(encoding.Int96Decoder).Decode(values[start : start+len])
+       })
+}
+
 func (cr *Int96ColumnChunkReader) ReadBatch(batchSize int64, values 
[]parquet.Int96, defLvls, repLvls []int16) (total int64, valuesRead int, err 
error) {
        return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
                return 
cr.curDecoder.(encoding.Int96Decoder).Decode(values[start : start+len])
@@ -120,8 +144,10 @@ func (cr *Float32ColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
        return nvalues, err
 }
 
-// ReadBatch reads batchSize values from the column.
-//
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the 
column, currently they are
+// used in test. The former one only reads values from the same page, while 
the latter one reads across
+// multiple pages to fill the batch.
+
 // Returns error if values is not at least big enough to hold the number of 
values that will be read.
 //
 // defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
@@ -129,6 +155,12 @@ func (cr *Float32ColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
 //
 // total is the number of rows that were read, valuesRead is the actual number 
of physical values
 // that were read excluding nulls
+func (cr *Float32ColumnChunkReader) ReadBatchInPage(batchSize int64, values 
[]float32, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+       return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start, 
len int64) (int, error) {
+               return 
cr.curDecoder.(encoding.Float32Decoder).Decode(values[start : start+len])
+       })
+}
+
 func (cr *Float32ColumnChunkReader) ReadBatch(batchSize int64, values 
[]float32, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
        return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
                return 
cr.curDecoder.(encoding.Float32Decoder).Decode(values[start : start+len])
@@ -148,8 +180,10 @@ func (cr *Float64ColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
        return nvalues, err
 }
 
-// ReadBatch reads batchSize values from the column.
-//
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the 
column, currently they are
+// used in test. The former one only reads values from the same page, while 
the latter one reads across
+// multiple pages to fill the batch.
+
 // Returns error if values is not at least big enough to hold the number of 
values that will be read.
 //
 // defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
@@ -157,6 +191,12 @@ func (cr *Float64ColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
 //
 // total is the number of rows that were read, valuesRead is the actual number 
of physical values
 // that were read excluding nulls
+func (cr *Float64ColumnChunkReader) ReadBatchInPage(batchSize int64, values 
[]float64, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+       return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start, 
len int64) (int, error) {
+               return 
cr.curDecoder.(encoding.Float64Decoder).Decode(values[start : start+len])
+       })
+}
+
 func (cr *Float64ColumnChunkReader) ReadBatch(batchSize int64, values 
[]float64, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
        return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
                return 
cr.curDecoder.(encoding.Float64Decoder).Decode(values[start : start+len])
@@ -176,8 +216,10 @@ func (cr *BooleanColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
        return nvalues, err
 }
 
-// ReadBatch reads batchSize values from the column.
-//
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the 
column, currently they are
+// used in test. The former one only reads values from the same page, while 
the latter one reads across
+// multiple pages to fill the batch.
+
 // Returns error if values is not at least big enough to hold the number of 
values that will be read.
 //
 // defLvls and repLvls can be nil, or will be populated if not nil. If not 
nil, they must be
@@ -185,6 +227,12 @@ func (cr *BooleanColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
 //
 // total is the number of rows that were read, valuesRead is the actual number 
of physical values
 // that were read excluding nulls
+func (cr *BooleanColumnChunkReader) ReadBatchInPage(batchSize int64, values 
[]bool, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+       return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start, 
len int64) (int, error) {
+               return 
cr.curDecoder.(encoding.BooleanDecoder).Decode(values[start : start+len])
+       })
+}
+
 func (cr *BooleanColumnChunkReader) ReadBatch(batchSize int64, values []bool, 
defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
        return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
                return 
cr.curDecoder.(encoding.BooleanDecoder).Decode(values[start : start+len])
@@ -204,7 +252,12 @@ func (cr *ByteArrayColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
        return nvalues, err
 }
 
-// ReadBatch reads batchSize values from the column.
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the 
column, currently they are
+// used in test. The former one only reads values from the same page, and 
might be shallow copies of the
+// underlying page data, while the latter one reads across multiple pages to 
fill the batch and values
+// have been cloned. User should choose the appropriate one based on their use 
cases. For example, if user
+// only needs to read and process values one by one, ReadBatchInPage is more 
efficient. Otherwise, if user
+// want to cache the values for later use, ReadBatch is more suitable.
 //
 // Returns error if values is not at least big enough to hold the number of 
values that will be read.
 //
@@ -213,9 +266,19 @@ func (cr *ByteArrayColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
 //
 // total is the number of rows that were read, valuesRead is the actual number 
of physical values
 // that were read excluding nulls
+func (cr *ByteArrayColumnChunkReader) ReadBatchInPage(batchSize int64, values 
[]parquet.ByteArray, defLvls, repLvls []int16) (total int64, valuesRead int, 
err error) {
+       return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start, 
len int64) (int, error) {
+               return 
cr.curDecoder.(encoding.ByteArrayDecoder).Decode(values[start : start+len])
+       })
+}
+
 func (cr *ByteArrayColumnChunkReader) ReadBatch(batchSize int64, values 
[]parquet.ByteArray, defLvls, repLvls []int16) (total int64, valuesRead int, 
err error) {
        return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
-               return 
cr.curDecoder.(encoding.ByteArrayDecoder).Decode(values[start : start+len])
+               n, err := 
cr.curDecoder.(encoding.ByteArrayDecoder).Decode(values[start : start+len])
+               if err == nil {
+                       cloneByteArray(values[start : start+len])
+               }
+               return n, err
        })
 }
 
@@ -232,7 +295,12 @@ func (cr *FixedLenByteArrayColumnChunkReader) Skip(nvalues 
int64) (int64, error)
        return nvalues, err
 }
 
-// ReadBatch reads batchSize values from the column.
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the 
column, currently they are
+// used in test. The former one only reads values from the same page, and 
might be shallow copies of the
+// underlying page data, while the latter one reads across multiple pages to 
fill the batch and values
+// have been cloned. User should choose the appropriate one based on their use 
cases. For example, if user
+// only needs to read and process values one by one, ReadBatchInPage is more 
efficient. Otherwise, if user
+// want to cache the values for later use, ReadBatch is more suitable.
 //
 // Returns error if values is not at least big enough to hold the number of 
values that will be read.
 //
@@ -241,8 +309,18 @@ func (cr *FixedLenByteArrayColumnChunkReader) Skip(nvalues 
int64) (int64, error)
 //
 // total is the number of rows that were read, valuesRead is the actual number 
of physical values
 // that were read excluding nulls
+func (cr *FixedLenByteArrayColumnChunkReader) ReadBatchInPage(batchSize int64, 
values []parquet.FixedLenByteArray, defLvls, repLvls []int16) (total int64, 
valuesRead int, err error) {
+       return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start, 
len int64) (int, error) {
+               return 
cr.curDecoder.(encoding.FixedLenByteArrayDecoder).Decode(values[start : 
start+len])
+       })
+}
+
 func (cr *FixedLenByteArrayColumnChunkReader) ReadBatch(batchSize int64, 
values []parquet.FixedLenByteArray, defLvls, repLvls []int16) (total int64, 
valuesRead int, err error) {
        return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
-               return 
cr.curDecoder.(encoding.FixedLenByteArrayDecoder).Decode(values[start : 
start+len])
+               n, err := 
cr.curDecoder.(encoding.FixedLenByteArrayDecoder).Decode(values[start : 
start+len])
+               if err == nil {
+                       cloneByteArray(values[start : start+len])
+               }
+               return n, err
        })
 }
diff --git a/parquet/file/column_reader_types.gen.go.tmpl 
b/parquet/file/column_reader_types.gen.go.tmpl
index a9d5bc0d..3c959ddc 100644
--- a/parquet/file/column_reader_types.gen.go.tmpl
+++ b/parquet/file/column_reader_types.gen.go.tmpl
@@ -35,7 +35,18 @@ func (cr *{{.Name}}ColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
   return nvalues, err
 }
 
-// ReadBatch reads batchSize values from the column.
+{{- if .isByteArray}}
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the 
column, currently they are 
+// used in test. The former one only reads values from the same page, and 
might be shallow copies of the
+// underlying page data, while the latter one reads across multiple pages to 
fill the batch and values
+// have been cloned. User should choose the appropriate one based on their use 
cases. For example, if user
+// only needs to read and process values one by one, ReadBatchInPage is more 
efficient. Otherwise, if user
+// want to cache the values for later use, ReadBatch is more suitable.
+{{- else}}
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the 
column, currently they are 
+// used in test. The former one only reads values from the same page, while 
the latter one reads across
+// multiple pages to fill the batch.
+{{end}}
 //
 // Returns error if values is not at least big enough to hold the number of 
values that will be read.
 //
@@ -44,9 +55,23 @@ func (cr *{{.Name}}ColumnChunkReader) Skip(nvalues int64) 
(int64, error) {
 //
 // total is the number of rows that were read, valuesRead is the actual number 
of physical values
 // that were read excluding nulls
+func (cr *{{.Name}}ColumnChunkReader) ReadBatchInPage(batchSize int64, values 
[]{{.name}}, defLvls, repLvls []int16) (total int64, valuesRead int, err error) 
{
+  return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start, len 
int64) (int, error) {
+    return 
cr.curDecoder.(encoding.{{.Name}}Decoder).Decode(values[start:start+len])
+  })
+}
+
 func (cr *{{.Name}}ColumnChunkReader) ReadBatch(batchSize int64, values 
[]{{.name}}, defLvls, repLvls []int16) (total int64, valuesRead int, err error) 
{
   return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64) 
(int, error) {
-    return 
cr.curDecoder.(encoding.{{.Name}}Decoder).Decode(values[start:start+len])
+{{- if .isByteArray}}
+    n, err := 
cr.curDecoder.(encoding.{{.Name}}Decoder).Decode(values[start:start+len])
+    if err == nil {
+      cloneByteArray(values[start : start+len])
+    }
+    return n, err
+{{- else}}
+    return cr.curDecoder.(encoding.{{.Name}}Decoder).Decode(values[start : 
start+len])
+{{- end}}
   })
 }
-{{end}}
+{{end}}
\ No newline at end of file
diff --git a/parquet/file/column_writer_test.go 
b/parquet/file/column_writer_test.go
index 90b239e4..f63bd45b 100644
--- a/parquet/file/column_writer_test.go
+++ b/parquet/file/column_writer_test.go
@@ -292,12 +292,41 @@ func (p *PrimitiveWriterTestSuite) 
readColumnFully(compression compress.Compress
                read := p.ReadBatch(reader, totalValues-valuesRead, valuesRead, 
p.DefLevelsOut[valuesRead:], p.RepLevelsOut[valuesRead:])
                valuesRead += read
        }
+       p.NoError(reader.Close())
        return valuesRead
 }
 
+// checkReadColumnByPage is used to check the data read from two interface is 
same
+func (p *PrimitiveWriterTestSuite) checkReadColumnByPage(compression 
compress.Compression) {
+       if p.descr.PhysicalType() != parquet.Types.FixedLenByteArray && 
p.descr.PhysicalType() != parquet.Types.ByteArray {
+               return
+       }
+       totalValues := int64(len(p.DefLevelsOut))
+       pagereader, _ := 
file.NewPageReader(arrutils.NewByteReader(p.readbuffer.Bytes()), totalValues, 
compression, mem, nil)
+       reader := file.NewColumnReader(p.descr, pagereader, mem, &p.bufferPool)
+
+       valuesRead := int64(0)
+       for valuesRead < totalValues {
+               read := p.ReadBatchInPage(reader, totalValues-valuesRead, 
p.DefLevelsOut, p.RepLevelsOut)
+               switch reader.(type) {
+               case *file.ByteArrayColumnChunkReader:
+                       batchOut := p.ValuesOut.([]parquet.ByteArray)
+                       pageOut := p.ValuesOutPage.([]parquet.ByteArray)
+                       p.Equal(batchOut[valuesRead:valuesRead+read], 
pageOut[:read])
+               case *file.FixedLenByteArrayColumnChunkReader:
+                       batchOut := p.ValuesOut.([]parquet.FixedLenByteArray)
+                       pageOut := p.ValuesOutPage.([]parquet.FixedLenByteArray)
+                       p.Equal(batchOut[valuesRead:valuesRead+read], 
pageOut[:read])
+               }
+               valuesRead += read
+       }
+       p.NoError(reader.Close())
+}
+
 func (p *PrimitiveWriterTestSuite) readAndCompare(compression 
compress.Compression, nrows int64) {
        p.SetupValuesOut(nrows)
        p.readColumnFully(compression)
+       p.checkReadColumnByPage(compression)
        p.Equal(p.Values, p.ValuesOut)
 }
 
@@ -394,6 +423,7 @@ func (p *PrimitiveWriterTestSuite) 
testDictionaryFallbackEncoding(version parque
        // Read all the rows so that we are sure that also the non-dictionary 
pages are read correctly
        p.SetupValuesOut(VeryLargeSize)
        valuesRead := p.readColumnFully(compress.Codecs.Uncompressed)
+       p.checkReadColumnByPage(compress.Codecs.Uncompressed)
        p.EqualValues(VeryLargeSize, valuesRead)
        p.Equal(p.Values, p.ValuesOut)
 
diff --git a/parquet/file/file_reader_test.go b/parquet/file/file_reader_test.go
index 45ce0676..1641a680 100644
--- a/parquet/file/file_reader_test.go
+++ b/parquet/file/file_reader_test.go
@@ -99,6 +99,7 @@ type PageSerdeSuite struct {
        dataPageHdr   format.DataPageHeader
        dataPageHdrV2 format.DataPageHeaderV2
 
+       alloc      *memory.CheckedAllocator
        pageReader file.PageReader
 }
 
@@ -126,7 +127,8 @@ func (p *PageSerdeSuite) SetupTest() {
 func (p *PageSerdeSuite) InitSerializedPageReader(nrows int64, codec 
compress.Compression) {
        p.EndStream()
 
-       p.pageReader, _ = 
file.NewPageReader(utils.NewByteReader(p.buffer.Bytes()), nrows, codec, 
memory.DefaultAllocator, nil)
+       p.alloc = memory.NewCheckedAllocator(memory.NewGoAllocator())
+       p.pageReader, _ = 
file.NewPageReader(utils.NewByteReader(p.buffer.Bytes()), nrows, codec, 
p.alloc, nil)
 }
 
 func (p *PageSerdeSuite) WriteDataPageHeader(maxSerialized int, uncompressed, 
compressed int32) {
@@ -193,6 +195,8 @@ func (p *PageSerdeSuite) TestDataPageV1() {
        p.True(p.pageReader.Next())
        currentPage := p.pageReader.Page()
        p.CheckDataPageHeader(p.dataPageHdr, currentPage)
+       p.NoError(p.pageReader.Close())
+       p.alloc.AssertSize(p.T(), 0)
 }
 
 func (p *PageSerdeSuite) TestDataPageV2() {
@@ -200,12 +204,15 @@ func (p *PageSerdeSuite) TestDataPageV2() {
                statsSize = 512
                nrows     = 4444
        )
+
        p.dataPageHdrV2.Statistics = getDummyStats(statsSize, true)
        p.dataPageHdrV2.NumValues = nrows
-       p.WriteDataPageHeaderV2(1024, 0, 0)
+       p.WriteDataPageHeaderV2(1024, 20, 10)
        p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
        p.True(p.pageReader.Next())
        p.CheckDataPageHeaderV2(p.dataPageHdrV2, p.pageReader.Page())
+       p.pageReader.Close()
+       p.alloc.AssertSize(p.T(), 0)
 }
 
 func (p *PageSerdeSuite) TestLargePageHeaders() {
@@ -227,6 +234,8 @@ func (p *PageSerdeSuite) TestLargePageHeaders() {
        p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
        p.True(p.pageReader.Next())
        p.CheckDataPageHeader(p.dataPageHdr, p.pageReader.Page())
+       p.NoError(p.pageReader.Close())
+       p.alloc.AssertSize(p.T(), 0)
 }
 
 func (p *PageSerdeSuite) TestFailLargePageHeaders() {
@@ -247,6 +256,8 @@ func (p *PageSerdeSuite) TestFailLargePageHeaders() {
        p.pageReader.SetMaxPageHeaderSize(smallerMaxSize)
        p.NotPanics(func() { p.False(p.pageReader.Next()) })
        p.Error(p.pageReader.Err())
+       p.NoError(p.pageReader.Close())
+       p.alloc.AssertSize(p.T(), 0)
 }
 
 func (p *PageSerdeSuite) TestCompression() {
@@ -290,6 +301,8 @@ func (p *PageSerdeSuite) TestCompression() {
                                p.IsType(&file.DataPageV1{}, page)
                                p.Equal(data, page.Data())
                        }
+                       p.pageReader.Close()
+                       p.alloc.AssertSize(p.T(), 0)
                        p.ResetStream()
                })
        }
diff --git a/parquet/file/file_writer_test.go b/parquet/file/file_writer_test.go
index ec1f7eb5..05522980 100644
--- a/parquet/file/file_writer_test.go
+++ b/parquet/file/file_writer_test.go
@@ -714,6 +714,7 @@ func (t *PageIndexRoundTripSuite) 
readPageIndexes(expectNumRG, expectNumPages in
                                }
                        }
 
+                       t.Require().NoError(pgRdr.Close())
                        t.Require().NoError(pgRdr.Err())
                }
        }
diff --git a/parquet/file/page_reader.go b/parquet/file/page_reader.go
index 92cc2f0e..1ba7ecbe 100644
--- a/parquet/file/page_reader.go
+++ b/parquet/file/page_reader.go
@@ -53,6 +53,8 @@ type PageReader interface {
        // Get the dictionary page for this column chunk
        GetDictionaryPage() (*DictionaryPage, error)
        SeekToPageWithRow(rowIdx int64) error
+       // Close releases the resources held by the reader.
+       Close() error
 }
 
 type PageType = format.PageType
@@ -369,14 +371,29 @@ type serializedPageReader struct {
 
        baseOffset, dataOffset, dictOffset int64
 
-       decompressBuffer bytes.Buffer
+       decompressBuffer *memory.Buffer
+       dataPageBuffer   *memory.Buffer
+       dictPageBuffer   *memory.Buffer
        err              error
 }
 
+func (p *serializedPageReader) Close() error {
+       if p.decompressBuffer != nil {
+               p.decompressBuffer.Release()
+               p.dictPageBuffer.Release()
+               p.dataPageBuffer.Release()
+       }
+       return nil
+}
+
 func (p *serializedPageReader) init(compressType compress.Compression, ctx 
*CryptoContext) error {
        if p.mem == nil {
                p.mem = memory.NewGoAllocator()
        }
+       p.decompressBuffer = memory.NewResizableBuffer(p.mem)
+       p.dataPageBuffer = memory.NewResizableBuffer(p.mem)
+       p.dictPageBuffer = memory.NewResizableBuffer(p.mem)
+       p.decompressBuffer.ResizeNoShrink(defaultPageHeaderSize)
 
        codec, err := compress.GetCodec(compressType)
        if err != nil {
@@ -384,10 +401,6 @@ func (p *serializedPageReader) init(compressType 
compress.Compression, ctx *Cryp
        }
        p.codec = codec
 
-       if p.decompressBuffer.Cap() < defaultPageHeaderSize {
-               p.decompressBuffer.Grow(defaultPageHeaderSize - 
p.decompressBuffer.Cap())
-       }
-
        if ctx != nil {
                p.cryptoCtx = *ctx
                p.initDecryption()
@@ -423,8 +436,12 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, 
compressType compress.
                nrows:             nrows,
                mem:               mem,
                codec:             codec,
+
+               decompressBuffer: memory.NewResizableBuffer(mem),
+               dataPageBuffer:   memory.NewResizableBuffer(mem),
+               dictPageBuffer:   memory.NewResizableBuffer(mem),
        }
-       rdr.decompressBuffer.Grow(defaultPageHeaderSize)
+       rdr.decompressBuffer.ResizeNoShrink(defaultPageHeaderSize)
        if ctx != nil {
                rdr.cryptoCtx = *ctx
                rdr.initDecryption()
@@ -441,7 +458,6 @@ func (p *serializedPageReader) Reset(r 
parquet.BufferedReader, nrows int64, comp
        if p.err != nil {
                return
        }
-       p.decompressBuffer.Reset()
        if ctx != nil {
                p.cryptoCtx = *ctx
                p.initDecryption()
@@ -485,8 +501,9 @@ func (p *serializedPageReader) Page() Page {
 }
 
 func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf 
[]byte) ([]byte, error) {
-       p.decompressBuffer.Grow(lenCompressed)
-       if _, err := io.CopyN(&p.decompressBuffer, rd, int64(lenCompressed)); 
err != nil {
+       p.decompressBuffer.ResizeNoShrink(lenCompressed)
+       b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0])
+       if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil {
                return nil, err
        }
 
@@ -544,7 +561,6 @@ func (p *serializedPageReader) GetDictionaryPage() 
(*DictionaryPage, error) {
                }
 
                p.cryptoCtx.StartDecryptWithDictionaryPage = true
-               p.decompressBuffer.Reset()
                if p.cryptoCtx.DataDecryptor != nil {
                        p.updateDecryption(p.cryptoCtx.DataDecryptor, 
encryption.DictPageModule, p.dataPageAad)
                }
@@ -560,9 +576,8 @@ func (p *serializedPageReader) GetDictionaryPage() 
(*DictionaryPage, error) {
                        return nil, errors.New("parquet: invalid page header 
(negative number of values)")
                }
 
-               buf := memory.NewResizableBuffer(p.mem)
-               defer buf.Release()
-               buf.ResizeNoShrink(lenUncompressed)
+               p.dictPageBuffer.ResizeNoShrink(lenUncompressed)
+               buf := memory.NewBufferBytes(p.dictPageBuffer.Bytes())
 
                data, err := p.decompress(rd, lenCompressed, buf.Bytes())
                if err != nil {
@@ -574,7 +589,7 @@ func (p *serializedPageReader) GetDictionaryPage() 
(*DictionaryPage, error) {
 
                return &DictionaryPage{
                        page: page{
-                               buf:      memory.NewBufferBytes(data),
+                               buf:      buf,
                                typ:      hdr.Type,
                                nvals:    dictHeader.GetNumValues(),
                                encoding: dictHeader.GetEncoding(),
@@ -682,7 +697,6 @@ func (p *serializedPageReader) Next() bool {
        p.err = nil
 
        for p.rowsSeen < p.nrows {
-               p.decompressBuffer.Reset()
                if err := p.readPageHeader(p.r, p.curPageHdr); err != nil {
                        if err != io.EOF {
                                p.err = err
@@ -702,10 +716,6 @@ func (p *serializedPageReader) Next() bool {
                        p.updateDecryption(p.cryptoCtx.DataDecryptor, 
encryption.DictPageModule, p.dataPageAad)
                }
 
-               buf := memory.NewResizableBuffer(p.mem)
-               defer buf.Release()
-               buf.ResizeNoShrink(lenUncompressed)
-
                switch p.curPageHdr.GetType() {
                case format.PageType_DICTIONARY_PAGE:
                        p.cryptoCtx.StartDecryptWithDictionaryPage = false
@@ -715,6 +725,9 @@ func (p *serializedPageReader) Next() bool {
                                return false
                        }
 
+                       p.dictPageBuffer.ResizeNoShrink(lenUncompressed)
+                       buf := memory.NewBufferBytes(p.dictPageBuffer.Bytes())
+
                        data, err := p.decompress(p.r, lenCompressed, 
buf.Bytes())
                        if err != nil {
                                p.err = err
@@ -728,7 +741,7 @@ func (p *serializedPageReader) Next() bool {
                        // make dictionary page
                        p.curPage = &DictionaryPage{
                                page: page{
-                                       buf:      memory.NewBufferBytes(data),
+                                       buf:      buf,
                                        typ:      p.curPageHdr.Type,
                                        nvals:    dictHeader.GetNumValues(),
                                        encoding: dictHeader.GetEncoding(),
@@ -744,6 +757,9 @@ func (p *serializedPageReader) Next() bool {
                                return false
                        }
 
+                       p.dataPageBuffer.ResizeNoShrink(lenUncompressed)
+                       buf := memory.NewBufferBytes(p.dataPageBuffer.Bytes())
+
                        firstRowIdx := p.rowsSeen
                        p.rowsSeen += int64(dataHeader.GetNumValues())
                        data, err := p.decompress(p.r, lenCompressed, 
buf.Bytes())
@@ -759,7 +775,7 @@ func (p *serializedPageReader) Next() bool {
                        // make datapagev1
                        p.curPage = &DataPageV1{
                                page: page{
-                                       buf:      memory.NewBufferBytes(data),
+                                       buf:      buf,
                                        typ:      p.curPageHdr.Type,
                                        nvals:    dataHeader.GetNumValues(),
                                        encoding: dataHeader.GetEncoding(),
@@ -783,6 +799,9 @@ func (p *serializedPageReader) Next() bool {
                                return false
                        }
 
+                       p.dataPageBuffer.ResizeNoShrink(lenUncompressed)
+                       buf := memory.NewBufferBytes(p.dataPageBuffer.Bytes())
+
                        compressed := dataHeader.GetIsCompressed()
                        // extract stats
                        firstRowIdx := p.rowsSeen
@@ -803,7 +822,6 @@ func (p *serializedPageReader) Next() bool {
                        } else {
                                io.ReadFull(p.r, buf.Bytes())
                        }
-                       buf.Retain()
 
                        if buf.Len() != lenUncompressed {
                                p.err = fmt.Errorf("parquet: metadata said %d 
bytes uncompressed data page, got %d bytes", lenUncompressed, buf.Len())
diff --git a/parquet/file/record_reader.go b/parquet/file/record_reader.go
index a21e0665..2a7aba90 100644
--- a/parquet/file/record_reader.go
+++ b/parquet/file/record_reader.go
@@ -368,6 +368,7 @@ func (rr *recordReader) Release() {
                rr.defLevels.Release()
                rr.repLevels.Release()
                rr.defLevels, rr.repLevels = nil, nil
+               rr.Close()
        }
 }
 
diff --git a/parquet/internal/encoding/physical_types.tmpldata 
b/parquet/internal/encoding/physical_types.tmpldata
index 0adeb995..b5907266 100644
--- a/parquet/internal/encoding/physical_types.tmpldata
+++ b/parquet/internal/encoding/physical_types.tmpldata
@@ -3,50 +3,58 @@
     "Name": "Int32",
     "name": "int32",
     "lower": "int32",
-    "prefix": "arrow"
+    "prefix": "arrow",
+    "isByteArray": false
   },
   {
     "Name": "Int64",
     "name": "int64",
     "lower": "int64",
-    "prefix": "arrow"
+    "prefix": "arrow",
+    "isByteArray": false
   },
   {
     "Name": "Int96",
     "name": "parquet.Int96",
     "lower": "int96",
-    "prefix": "parquet"
+    "prefix": "parquet",
+    "isByteArray": false
   },
   {
     "Name": "Float32",
     "name": "float32",
     "lower": "float32",
     "prefix": "arrow",
-    "physical": "Float"
+    "physical": "Float",
+    "isByteArray": false
   },
   {
     "Name": "Float64",
     "name": "float64",
     "lower": "float64",
     "prefix": "arrow",
-    "physical": "Double"
+    "physical": "Double",
+    "isByteArray": false
   },
   {
     "Name": "Boolean",
     "name": "bool",
     "lower": "bool",
-    "prefix": "arrow"
+    "prefix": "arrow",
+    "isByteArray": false
   },
   {
     "Name": "ByteArray",
     "name": "parquet.ByteArray",
     "lower": "byteArray",
-    "prefix": "parquet"
+    "prefix": "parquet",
+    "isByteArray": true
   },
   {
     "Name": "FixedLenByteArray",
     "name": "parquet.FixedLenByteArray",
     "lower": "fixedLenByteArray",
-    "prefix": "parquet"
+    "prefix": "parquet",
+    "isByteArray": true
   }
 ]
diff --git a/parquet/internal/testutils/pagebuilder.go 
b/parquet/internal/testutils/pagebuilder.go
index 41c2ba6f..95ddf57f 100644
--- a/parquet/internal/testutils/pagebuilder.go
+++ b/parquet/internal/testutils/pagebuilder.go
@@ -234,6 +234,10 @@ type MockPageReader struct {
        curpage int
 }
 
+func (m *MockPageReader) Close() error {
+       return nil
+}
+
 func (m *MockPageReader) Err() error {
        return m.Called().Error(0)
 }
diff --git a/parquet/internal/testutils/primitive_typed.go 
b/parquet/internal/testutils/primitive_typed.go
index 927ad63a..3f281f32 100644
--- a/parquet/internal/testutils/primitive_typed.go
+++ b/parquet/internal/testutils/primitive_typed.go
@@ -38,9 +38,10 @@ type PrimitiveTypedTest struct {
        Buffer    *memory.Buffer
        Values    interface{}
 
-       ValuesOut    interface{}
-       DefLevelsOut []int16
-       RepLevelsOut []int16
+       ValuesOut     interface{}
+       ValuesOutPage interface{}
+       DefLevelsOut  []int16
+       RepLevelsOut  []int16
 }
 
 func NewPrimitiveTypedTest(typ reflect.Type) PrimitiveTypedTest {
@@ -49,6 +50,7 @@ func NewPrimitiveTypedTest(typ reflect.Type) 
PrimitiveTypedTest {
 
 func (p *PrimitiveTypedTest) SetupValuesOut(nvalues int64) {
        p.ValuesOut = reflect.MakeSlice(reflect.SliceOf(p.Typ), int(nvalues), 
int(nvalues)).Interface()
+       p.ValuesOutPage = reflect.MakeSlice(reflect.SliceOf(p.Typ), 
int(nvalues), int(nvalues)).Interface()
        p.DefLevelsOut = make([]int16, nvalues)
        p.RepLevelsOut = make([]int16, nvalues)
 }
@@ -252,6 +254,37 @@ func (p *PrimitiveTypedTest) ReadBatch(reader 
file.ColumnChunkReader, batch, val
        }
 }
 
+func (p *PrimitiveTypedTest) ReadBatchInPage(reader file.ColumnChunkReader, 
batch int64, defLevels, repLevels []int16) int64 {
+       switch r := reader.(type) {
+       case *file.Int32ColumnChunkReader:
+               _, read, _ := r.ReadBatchInPage(batch, 
p.ValuesOutPage.([]int32), defLevels, repLevels)
+               return int64(read)
+       case *file.Int64ColumnChunkReader:
+               _, read, _ := r.ReadBatchInPage(batch, 
p.ValuesOutPage.([]int64), defLevels, repLevels)
+               return int64(read)
+       case *file.Float32ColumnChunkReader:
+               _, read, _ := r.ReadBatchInPage(batch, 
p.ValuesOutPage.([]float32), defLevels, repLevels)
+               return int64(read)
+       case *file.Float64ColumnChunkReader:
+               _, read, _ := r.ReadBatchInPage(batch, 
p.ValuesOutPage.([]float64), defLevels, repLevels)
+               return int64(read)
+       case *file.Int96ColumnChunkReader:
+               _, read, _ := r.ReadBatchInPage(batch, 
p.ValuesOutPage.([]parquet.Int96), defLevels, repLevels)
+               return int64(read)
+       case *file.ByteArrayColumnChunkReader:
+               _, read, _ := r.ReadBatchInPage(batch, 
p.ValuesOutPage.([]parquet.ByteArray), defLevels, repLevels)
+               return int64(read)
+       case *file.BooleanColumnChunkReader:
+               _, read, _ := r.ReadBatchInPage(batch, 
p.ValuesOutPage.([]bool), defLevels, repLevels)
+               return int64(read)
+       case *file.FixedLenByteArrayColumnChunkReader:
+               _, read, _ := r.ReadBatchInPage(batch, 
p.ValuesOutPage.([]parquet.FixedLenByteArray), defLevels, repLevels)
+               return int64(read)
+       default:
+               panic("unimplemented")
+       }
+}
+
 func Min(v1, v2 interface{}) interface{} {
        switch n1 := v1.(type) {
        case int32:
diff --git a/parquet/pqarrow/encode_dictionary_test.go 
b/parquet/pqarrow/encode_dictionary_test.go
index ac6d4e50..e47b7e6b 100644
--- a/parquet/pqarrow/encode_dictionary_test.go
+++ b/parquet/pqarrow/encode_dictionary_test.go
@@ -290,6 +290,7 @@ func (ad *ArrowWriteDictionarySuite) 
TestStatisticsWithFallback() {
                                }
 
                                ad.False(pr.Next())
+                               ad.NoError(pr.Close())
                        }
                })
        }


Reply via email to