This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new f0b6fd9e feat(parquet): utilize memory allocator in
`serializedPageReader` (#485)
f0b6fd9e is described below
commit f0b6fd9eacfd244cdef200a6115873e8279f4297
Author: Ruihao Chen <[email protected]>
AuthorDate: Tue Sep 9 04:26:54 2025 +0800
feat(parquet): utilize memory allocator in `serializedPageReader` (#485)
### Rationale for this change
Previously, only data page v2 in the reader will use the memory
allocated from allocator. And seems there exists some misuage of the
allocator.
### What changes are included in this PR?
Use memory allocator to allocate buffer for `serializedPageReader`, and
introduce `Close` function to make sure all the memory will be freed
after usage.
### Are these changes tested?
Track the memory usage in existing test to ensure all the allocated
memory are freed.
### Are there any user-facing changes?
For user with no explicitly set allocators, they don't need to change
anything. Otherwise, they should remember to call
`ColumnChunkReader.Close` after reading after chunk.
---------
Signed-off-by: Ruihao Chen <[email protected]>
Signed-off-by: Ruihao Chen <[email protected]>
---
parquet/file/column_reader.go | 93 +++++++++++++-----
parquet/file/column_reader_types.gen.go | 110 ++++++++++++++++++----
parquet/file/column_reader_types.gen.go.tmpl | 31 +++++-
parquet/file/column_writer_test.go | 30 ++++++
parquet/file/file_reader_test.go | 17 +++-
parquet/file/file_writer_test.go | 1 +
parquet/file/page_reader.go | 62 +++++++-----
parquet/file/record_reader.go | 1 +
parquet/internal/encoding/physical_types.tmpldata | 24 +++--
parquet/internal/testutils/pagebuilder.go | 4 +
parquet/internal/testutils/primitive_typed.go | 39 +++++++-
parquet/pqarrow/encode_dictionary_test.go | 1 +
12 files changed, 336 insertions(+), 77 deletions(-)
diff --git a/parquet/file/column_reader.go b/parquet/file/column_reader.go
index 03ca5a8f..5597689b 100644
--- a/parquet/file/column_reader.go
+++ b/parquet/file/column_reader.go
@@ -38,6 +38,23 @@ const (
defaultPageHeaderSize = 16 * 1024
)
+// cloneByteArray is a helper function to clone a slice of byte slices
+func cloneByteArray[T ~[]byte](src []T) {
+ totalLength := 0
+ for i := range src {
+ totalLength += len(src[i])
+ }
+
+ buf := make([]byte, totalLength)
+ pos := 0
+ for i := range src {
+ srcLen := len(src[i])
+ copy(buf[pos:pos+srcLen], src[i])
+ src[i] = T(buf[pos : pos+srcLen])
+ pos += srcLen
+ }
+}
+
//go:generate go run ../../arrow/_tools/tmpl/main.go -i
-data=../internal/encoding/physical_types.tmpldata
column_reader_types.gen.go.tmpl
func isDictIndexEncoding(e format.Encoding) bool {
@@ -113,6 +130,8 @@ type ColumnChunkReader interface {
// automatically read the first page of the page reader passed in until
// HasNext which will read in the next page.
setPageReader(PageReader)
+ // Close releases the resources held by the column reader.
+ Close() error
}
type columnChunkReader struct {
@@ -221,12 +240,24 @@ func (c *columnChunkReader) consumeBufferedValues(n
int64) { c.numDecoded += n }
func (c *columnChunkReader) numAvailValues() int64 { return
c.numBuffered - c.numDecoded }
func (c *columnChunkReader) pager() PageReader { return c.rdr }
func (c *columnChunkReader) setPageReader(rdr PageReader) {
+ c.Close()
c.rdr, c.err = rdr, nil
c.decoders = make(map[format.Encoding]encoding.TypedDecoder)
c.newDictionary = false
c.numBuffered, c.numDecoded = 0, 0
}
+// Close closes the page raeder and the page if set.
+func (c *columnChunkReader) Close() error {
+ if c.curPage != nil {
+ c.curPage.Release()
+ }
+ if c.rdr != nil {
+ return c.rdr.Close()
+ }
+ return nil
+}
+
func (c *columnChunkReader) getDefLvlBuffer(sz int64) []int16 {
if int64(len(c.defLvlBuffer)) < sz {
c.defLvlBuffer = make([]int16, sz)
@@ -340,6 +371,10 @@ func (c *columnChunkReader) readNewPage() bool {
return true
}
}
+
+ // If we get here, we're at the end of the column, and the page must
+ // have already been released. So set it to nil to avoid releasing
twice.
+ c.curPage = nil
c.err = c.rdr.Err()
return false
}
@@ -632,19 +667,18 @@ func (c *columnChunkReader) skipRows(nrows int64) error {
type readerFunc func(int64, int64) (int, error)
-// base function for reading a batch of values, this will read until it either
reads in batchSize values or
-// it hits the end of the column chunk, including reading multiple pages.
+// readBatch is base function for reading a batch of values, this will read
until it either reads
+// in batchSize values or it hits the end of the column chunk, including
reading multiple pages.
//
-// totalValues is the total number of values which were read in, and thus
would be the total number
+// totalLvls is the total number of values which were read in, and thus would
be the total number
// of definition levels and repetition levels which were populated (if they
were non-nil). totalRead
// is the number of physical values that were read in (ie: the number of
non-null values)
func (c *columnChunkReader) readBatch(batchSize int64, defLvls, repLvls
[]int16, readFn readerFunc) (totalLvls int64, totalRead int, err error) {
var (
- read int
- defs []int16
- reps []int16
- ndefs int
- toRead int64
+ defs []int16
+ reps []int16
+ lvls int64
+ read int
)
for totalLvls < batchSize && c.HasNext() && err == nil {
@@ -654,22 +688,35 @@ func (c *columnChunkReader) readBatch(batchSize int64,
defLvls, repLvls []int16,
if repLvls != nil {
reps = repLvls[totalLvls:]
}
- ndefs, toRead, err = c.determineNumToRead(batchSize-totalLvls,
defs, reps)
- if err != nil {
- return totalLvls, totalRead, err
- }
-
- read, err = readFn(int64(totalRead), toRead)
- // the total number of values processed here is the maximum of
- // the number of definition levels or the number of physical
values read.
- // if this is a required field, ndefs will be 0 since there is
no definition
- // levels stored with it and `read` will be the number of
values, otherwise
- // we use ndefs since it will be equal to or greater than read.
- totalVals := int64(utils.Max(ndefs, read))
- c.consumeBufferedValues(totalVals)
-
- totalLvls += totalVals
+ lvls, read, err = c.readBatchInPage(batchSize-totalLvls,
int64(totalRead), defs, reps, readFn)
+ totalLvls += lvls
totalRead += read
}
return totalLvls, totalRead, err
}
+
+// readBatchInPage is a helper function for reading a batch of values. This
function ensures
+// the read values are from the same page.
+//
+// TotalRead is the start index to pass to readFn.
+func (c *columnChunkReader) readBatchInPage(batchSize int64, totalRead int64,
defLvls, repLvls []int16, readFn readerFunc) (lvls int64, read int, err error) {
+ if !c.HasNext() {
+ return 0, 0, c.err
+ }
+
+ ndefs, toRead, err := c.determineNumToRead(batchSize, defLvls, repLvls)
+ if err != nil {
+ return 0, 0, err
+ }
+
+ read, err = readFn(totalRead, toRead)
+ // the total number of values processed here is the maximum of
+ // the number of definition levels or the number of physical values
read.
+ // if this is a required field, ndefs will be 0 since there is no
definition
+ // levels stored with it and `read` will be the number of values,
otherwise
+ // we use ndefs since it will be equal to or greater than read.
+ lvls = int64(utils.Max(ndefs, read))
+ c.consumeBufferedValues(lvls)
+
+ return lvls, read, err
+}
diff --git a/parquet/file/column_reader_types.gen.go
b/parquet/file/column_reader_types.gen.go
index a60cf8e3..fe3e20e5 100644
--- a/parquet/file/column_reader_types.gen.go
+++ b/parquet/file/column_reader_types.gen.go
@@ -36,8 +36,10 @@ func (cr *Int32ColumnChunkReader) Skip(nvalues int64)
(int64, error) {
return nvalues, err
}
-// ReadBatch reads batchSize values from the column.
-//
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the
column, currently they are
+// used in test. The former one only reads values from the same page, while
the latter one reads across
+// multiple pages to fill the batch.
+
// Returns error if values is not at least big enough to hold the number of
values that will be read.
//
// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
@@ -45,6 +47,12 @@ func (cr *Int32ColumnChunkReader) Skip(nvalues int64)
(int64, error) {
//
// total is the number of rows that were read, valuesRead is the actual number
of physical values
// that were read excluding nulls
+func (cr *Int32ColumnChunkReader) ReadBatchInPage(batchSize int64, values
[]int32, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+ return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start,
len int64) (int, error) {
+ return
cr.curDecoder.(encoding.Int32Decoder).Decode(values[start : start+len])
+ })
+}
+
func (cr *Int32ColumnChunkReader) ReadBatch(batchSize int64, values []int32,
defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
return
cr.curDecoder.(encoding.Int32Decoder).Decode(values[start : start+len])
@@ -64,8 +72,10 @@ func (cr *Int64ColumnChunkReader) Skip(nvalues int64)
(int64, error) {
return nvalues, err
}
-// ReadBatch reads batchSize values from the column.
-//
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the
column, currently they are
+// used in test. The former one only reads values from the same page, while
the latter one reads across
+// multiple pages to fill the batch.
+
// Returns error if values is not at least big enough to hold the number of
values that will be read.
//
// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
@@ -73,6 +83,12 @@ func (cr *Int64ColumnChunkReader) Skip(nvalues int64)
(int64, error) {
//
// total is the number of rows that were read, valuesRead is the actual number
of physical values
// that were read excluding nulls
+func (cr *Int64ColumnChunkReader) ReadBatchInPage(batchSize int64, values
[]int64, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+ return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start,
len int64) (int, error) {
+ return
cr.curDecoder.(encoding.Int64Decoder).Decode(values[start : start+len])
+ })
+}
+
func (cr *Int64ColumnChunkReader) ReadBatch(batchSize int64, values []int64,
defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
return
cr.curDecoder.(encoding.Int64Decoder).Decode(values[start : start+len])
@@ -92,8 +108,10 @@ func (cr *Int96ColumnChunkReader) Skip(nvalues int64)
(int64, error) {
return nvalues, err
}
-// ReadBatch reads batchSize values from the column.
-//
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the
column, currently they are
+// used in test. The former one only reads values from the same page, while
the latter one reads across
+// multiple pages to fill the batch.
+
// Returns error if values is not at least big enough to hold the number of
values that will be read.
//
// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
@@ -101,6 +119,12 @@ func (cr *Int96ColumnChunkReader) Skip(nvalues int64)
(int64, error) {
//
// total is the number of rows that were read, valuesRead is the actual number
of physical values
// that were read excluding nulls
+func (cr *Int96ColumnChunkReader) ReadBatchInPage(batchSize int64, values
[]parquet.Int96, defLvls, repLvls []int16) (total int64, valuesRead int, err
error) {
+ return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start,
len int64) (int, error) {
+ return
cr.curDecoder.(encoding.Int96Decoder).Decode(values[start : start+len])
+ })
+}
+
func (cr *Int96ColumnChunkReader) ReadBatch(batchSize int64, values
[]parquet.Int96, defLvls, repLvls []int16) (total int64, valuesRead int, err
error) {
return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
return
cr.curDecoder.(encoding.Int96Decoder).Decode(values[start : start+len])
@@ -120,8 +144,10 @@ func (cr *Float32ColumnChunkReader) Skip(nvalues int64)
(int64, error) {
return nvalues, err
}
-// ReadBatch reads batchSize values from the column.
-//
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the
column, currently they are
+// used in test. The former one only reads values from the same page, while
the latter one reads across
+// multiple pages to fill the batch.
+
// Returns error if values is not at least big enough to hold the number of
values that will be read.
//
// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
@@ -129,6 +155,12 @@ func (cr *Float32ColumnChunkReader) Skip(nvalues int64)
(int64, error) {
//
// total is the number of rows that were read, valuesRead is the actual number
of physical values
// that were read excluding nulls
+func (cr *Float32ColumnChunkReader) ReadBatchInPage(batchSize int64, values
[]float32, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+ return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start,
len int64) (int, error) {
+ return
cr.curDecoder.(encoding.Float32Decoder).Decode(values[start : start+len])
+ })
+}
+
func (cr *Float32ColumnChunkReader) ReadBatch(batchSize int64, values
[]float32, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
return
cr.curDecoder.(encoding.Float32Decoder).Decode(values[start : start+len])
@@ -148,8 +180,10 @@ func (cr *Float64ColumnChunkReader) Skip(nvalues int64)
(int64, error) {
return nvalues, err
}
-// ReadBatch reads batchSize values from the column.
-//
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the
column, currently they are
+// used in test. The former one only reads values from the same page, while
the latter one reads across
+// multiple pages to fill the batch.
+
// Returns error if values is not at least big enough to hold the number of
values that will be read.
//
// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
@@ -157,6 +191,12 @@ func (cr *Float64ColumnChunkReader) Skip(nvalues int64)
(int64, error) {
//
// total is the number of rows that were read, valuesRead is the actual number
of physical values
// that were read excluding nulls
+func (cr *Float64ColumnChunkReader) ReadBatchInPage(batchSize int64, values
[]float64, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+ return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start,
len int64) (int, error) {
+ return
cr.curDecoder.(encoding.Float64Decoder).Decode(values[start : start+len])
+ })
+}
+
func (cr *Float64ColumnChunkReader) ReadBatch(batchSize int64, values
[]float64, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
return
cr.curDecoder.(encoding.Float64Decoder).Decode(values[start : start+len])
@@ -176,8 +216,10 @@ func (cr *BooleanColumnChunkReader) Skip(nvalues int64)
(int64, error) {
return nvalues, err
}
-// ReadBatch reads batchSize values from the column.
-//
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the
column, currently they are
+// used in test. The former one only reads values from the same page, while
the latter one reads across
+// multiple pages to fill the batch.
+
// Returns error if values is not at least big enough to hold the number of
values that will be read.
//
// defLvls and repLvls can be nil, or will be populated if not nil. If not
nil, they must be
@@ -185,6 +227,12 @@ func (cr *BooleanColumnChunkReader) Skip(nvalues int64)
(int64, error) {
//
// total is the number of rows that were read, valuesRead is the actual number
of physical values
// that were read excluding nulls
+func (cr *BooleanColumnChunkReader) ReadBatchInPage(batchSize int64, values
[]bool, defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
+ return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start,
len int64) (int, error) {
+ return
cr.curDecoder.(encoding.BooleanDecoder).Decode(values[start : start+len])
+ })
+}
+
func (cr *BooleanColumnChunkReader) ReadBatch(batchSize int64, values []bool,
defLvls, repLvls []int16) (total int64, valuesRead int, err error) {
return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
return
cr.curDecoder.(encoding.BooleanDecoder).Decode(values[start : start+len])
@@ -204,7 +252,12 @@ func (cr *ByteArrayColumnChunkReader) Skip(nvalues int64)
(int64, error) {
return nvalues, err
}
-// ReadBatch reads batchSize values from the column.
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the
column, currently they are
+// used in test. The former one only reads values from the same page, and
might be shallow copies of the
+// underlying page data, while the latter one reads across multiple pages to
fill the batch and values
+// have been cloned. User should choose the appropriate one based on their use
cases. For example, if user
+// only needs to read and process values one by one, ReadBatchInPage is more
efficient. Otherwise, if user
+// want to cache the values for later use, ReadBatch is more suitable.
//
// Returns error if values is not at least big enough to hold the number of
values that will be read.
//
@@ -213,9 +266,19 @@ func (cr *ByteArrayColumnChunkReader) Skip(nvalues int64)
(int64, error) {
//
// total is the number of rows that were read, valuesRead is the actual number
of physical values
// that were read excluding nulls
+func (cr *ByteArrayColumnChunkReader) ReadBatchInPage(batchSize int64, values
[]parquet.ByteArray, defLvls, repLvls []int16) (total int64, valuesRead int,
err error) {
+ return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start,
len int64) (int, error) {
+ return
cr.curDecoder.(encoding.ByteArrayDecoder).Decode(values[start : start+len])
+ })
+}
+
func (cr *ByteArrayColumnChunkReader) ReadBatch(batchSize int64, values
[]parquet.ByteArray, defLvls, repLvls []int16) (total int64, valuesRead int,
err error) {
return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
- return
cr.curDecoder.(encoding.ByteArrayDecoder).Decode(values[start : start+len])
+ n, err :=
cr.curDecoder.(encoding.ByteArrayDecoder).Decode(values[start : start+len])
+ if err == nil {
+ cloneByteArray(values[start : start+len])
+ }
+ return n, err
})
}
@@ -232,7 +295,12 @@ func (cr *FixedLenByteArrayColumnChunkReader) Skip(nvalues
int64) (int64, error)
return nvalues, err
}
-// ReadBatch reads batchSize values from the column.
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the
column, currently they are
+// used in test. The former one only reads values from the same page, and
might be shallow copies of the
+// underlying page data, while the latter one reads across multiple pages to
fill the batch and values
+// have been cloned. User should choose the appropriate one based on their use
cases. For example, if user
+// only needs to read and process values one by one, ReadBatchInPage is more
efficient. Otherwise, if user
+// want to cache the values for later use, ReadBatch is more suitable.
//
// Returns error if values is not at least big enough to hold the number of
values that will be read.
//
@@ -241,8 +309,18 @@ func (cr *FixedLenByteArrayColumnChunkReader) Skip(nvalues
int64) (int64, error)
//
// total is the number of rows that were read, valuesRead is the actual number
of physical values
// that were read excluding nulls
+func (cr *FixedLenByteArrayColumnChunkReader) ReadBatchInPage(batchSize int64,
values []parquet.FixedLenByteArray, defLvls, repLvls []int16) (total int64,
valuesRead int, err error) {
+ return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start,
len int64) (int, error) {
+ return
cr.curDecoder.(encoding.FixedLenByteArrayDecoder).Decode(values[start :
start+len])
+ })
+}
+
func (cr *FixedLenByteArrayColumnChunkReader) ReadBatch(batchSize int64,
values []parquet.FixedLenByteArray, defLvls, repLvls []int16) (total int64,
valuesRead int, err error) {
return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
- return
cr.curDecoder.(encoding.FixedLenByteArrayDecoder).Decode(values[start :
start+len])
+ n, err :=
cr.curDecoder.(encoding.FixedLenByteArrayDecoder).Decode(values[start :
start+len])
+ if err == nil {
+ cloneByteArray(values[start : start+len])
+ }
+ return n, err
})
}
diff --git a/parquet/file/column_reader_types.gen.go.tmpl
b/parquet/file/column_reader_types.gen.go.tmpl
index a9d5bc0d..3c959ddc 100644
--- a/parquet/file/column_reader_types.gen.go.tmpl
+++ b/parquet/file/column_reader_types.gen.go.tmpl
@@ -35,7 +35,18 @@ func (cr *{{.Name}}ColumnChunkReader) Skip(nvalues int64)
(int64, error) {
return nvalues, err
}
-// ReadBatch reads batchSize values from the column.
+{{- if .isByteArray}}
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the
column, currently they are
+// used in test. The former one only reads values from the same page, and
might be shallow copies of the
+// underlying page data, while the latter one reads across multiple pages to
fill the batch and values
+// have been cloned. User should choose the appropriate one based on their use
cases. For example, if user
+// only needs to read and process values one by one, ReadBatchInPage is more
efficient. Otherwise, if user
+// want to cache the values for later use, ReadBatch is more suitable.
+{{- else}}
+// ReadBatchInPage and ReadBatch are used to read batchSize values from the
column, currently they are
+// used in test. The former one only reads values from the same page, while
the latter one reads across
+// multiple pages to fill the batch.
+{{end}}
//
// Returns error if values is not at least big enough to hold the number of
values that will be read.
//
@@ -44,9 +55,23 @@ func (cr *{{.Name}}ColumnChunkReader) Skip(nvalues int64)
(int64, error) {
//
// total is the number of rows that were read, valuesRead is the actual number
of physical values
// that were read excluding nulls
+func (cr *{{.Name}}ColumnChunkReader) ReadBatchInPage(batchSize int64, values
[]{{.name}}, defLvls, repLvls []int16) (total int64, valuesRead int, err error)
{
+ return cr.readBatchInPage(batchSize, 0, defLvls, repLvls, func(start, len
int64) (int, error) {
+ return
cr.curDecoder.(encoding.{{.Name}}Decoder).Decode(values[start:start+len])
+ })
+}
+
func (cr *{{.Name}}ColumnChunkReader) ReadBatch(batchSize int64, values
[]{{.name}}, defLvls, repLvls []int16) (total int64, valuesRead int, err error)
{
return cr.readBatch(batchSize, defLvls, repLvls, func(start, len int64)
(int, error) {
- return
cr.curDecoder.(encoding.{{.Name}}Decoder).Decode(values[start:start+len])
+{{- if .isByteArray}}
+ n, err :=
cr.curDecoder.(encoding.{{.Name}}Decoder).Decode(values[start:start+len])
+ if err == nil {
+ cloneByteArray(values[start : start+len])
+ }
+ return n, err
+{{- else}}
+ return cr.curDecoder.(encoding.{{.Name}}Decoder).Decode(values[start :
start+len])
+{{- end}}
})
}
-{{end}}
+{{end}}
\ No newline at end of file
diff --git a/parquet/file/column_writer_test.go
b/parquet/file/column_writer_test.go
index 90b239e4..f63bd45b 100644
--- a/parquet/file/column_writer_test.go
+++ b/parquet/file/column_writer_test.go
@@ -292,12 +292,41 @@ func (p *PrimitiveWriterTestSuite)
readColumnFully(compression compress.Compress
read := p.ReadBatch(reader, totalValues-valuesRead, valuesRead,
p.DefLevelsOut[valuesRead:], p.RepLevelsOut[valuesRead:])
valuesRead += read
}
+ p.NoError(reader.Close())
return valuesRead
}
+// checkReadColumnByPage is used to check the data read from two interface is
same
+func (p *PrimitiveWriterTestSuite) checkReadColumnByPage(compression
compress.Compression) {
+ if p.descr.PhysicalType() != parquet.Types.FixedLenByteArray &&
p.descr.PhysicalType() != parquet.Types.ByteArray {
+ return
+ }
+ totalValues := int64(len(p.DefLevelsOut))
+ pagereader, _ :=
file.NewPageReader(arrutils.NewByteReader(p.readbuffer.Bytes()), totalValues,
compression, mem, nil)
+ reader := file.NewColumnReader(p.descr, pagereader, mem, &p.bufferPool)
+
+ valuesRead := int64(0)
+ for valuesRead < totalValues {
+ read := p.ReadBatchInPage(reader, totalValues-valuesRead,
p.DefLevelsOut, p.RepLevelsOut)
+ switch reader.(type) {
+ case *file.ByteArrayColumnChunkReader:
+ batchOut := p.ValuesOut.([]parquet.ByteArray)
+ pageOut := p.ValuesOutPage.([]parquet.ByteArray)
+ p.Equal(batchOut[valuesRead:valuesRead+read],
pageOut[:read])
+ case *file.FixedLenByteArrayColumnChunkReader:
+ batchOut := p.ValuesOut.([]parquet.FixedLenByteArray)
+ pageOut := p.ValuesOutPage.([]parquet.FixedLenByteArray)
+ p.Equal(batchOut[valuesRead:valuesRead+read],
pageOut[:read])
+ }
+ valuesRead += read
+ }
+ p.NoError(reader.Close())
+}
+
func (p *PrimitiveWriterTestSuite) readAndCompare(compression
compress.Compression, nrows int64) {
p.SetupValuesOut(nrows)
p.readColumnFully(compression)
+ p.checkReadColumnByPage(compression)
p.Equal(p.Values, p.ValuesOut)
}
@@ -394,6 +423,7 @@ func (p *PrimitiveWriterTestSuite)
testDictionaryFallbackEncoding(version parque
// Read all the rows so that we are sure that also the non-dictionary
pages are read correctly
p.SetupValuesOut(VeryLargeSize)
valuesRead := p.readColumnFully(compress.Codecs.Uncompressed)
+ p.checkReadColumnByPage(compress.Codecs.Uncompressed)
p.EqualValues(VeryLargeSize, valuesRead)
p.Equal(p.Values, p.ValuesOut)
diff --git a/parquet/file/file_reader_test.go b/parquet/file/file_reader_test.go
index 45ce0676..1641a680 100644
--- a/parquet/file/file_reader_test.go
+++ b/parquet/file/file_reader_test.go
@@ -99,6 +99,7 @@ type PageSerdeSuite struct {
dataPageHdr format.DataPageHeader
dataPageHdrV2 format.DataPageHeaderV2
+ alloc *memory.CheckedAllocator
pageReader file.PageReader
}
@@ -126,7 +127,8 @@ func (p *PageSerdeSuite) SetupTest() {
func (p *PageSerdeSuite) InitSerializedPageReader(nrows int64, codec
compress.Compression) {
p.EndStream()
- p.pageReader, _ =
file.NewPageReader(utils.NewByteReader(p.buffer.Bytes()), nrows, codec,
memory.DefaultAllocator, nil)
+ p.alloc = memory.NewCheckedAllocator(memory.NewGoAllocator())
+ p.pageReader, _ =
file.NewPageReader(utils.NewByteReader(p.buffer.Bytes()), nrows, codec,
p.alloc, nil)
}
func (p *PageSerdeSuite) WriteDataPageHeader(maxSerialized int, uncompressed,
compressed int32) {
@@ -193,6 +195,8 @@ func (p *PageSerdeSuite) TestDataPageV1() {
p.True(p.pageReader.Next())
currentPage := p.pageReader.Page()
p.CheckDataPageHeader(p.dataPageHdr, currentPage)
+ p.NoError(p.pageReader.Close())
+ p.alloc.AssertSize(p.T(), 0)
}
func (p *PageSerdeSuite) TestDataPageV2() {
@@ -200,12 +204,15 @@ func (p *PageSerdeSuite) TestDataPageV2() {
statsSize = 512
nrows = 4444
)
+
p.dataPageHdrV2.Statistics = getDummyStats(statsSize, true)
p.dataPageHdrV2.NumValues = nrows
- p.WriteDataPageHeaderV2(1024, 0, 0)
+ p.WriteDataPageHeaderV2(1024, 20, 10)
p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
p.True(p.pageReader.Next())
p.CheckDataPageHeaderV2(p.dataPageHdrV2, p.pageReader.Page())
+ p.pageReader.Close()
+ p.alloc.AssertSize(p.T(), 0)
}
func (p *PageSerdeSuite) TestLargePageHeaders() {
@@ -227,6 +234,8 @@ func (p *PageSerdeSuite) TestLargePageHeaders() {
p.InitSerializedPageReader(nrows, compress.Codecs.Uncompressed)
p.True(p.pageReader.Next())
p.CheckDataPageHeader(p.dataPageHdr, p.pageReader.Page())
+ p.NoError(p.pageReader.Close())
+ p.alloc.AssertSize(p.T(), 0)
}
func (p *PageSerdeSuite) TestFailLargePageHeaders() {
@@ -247,6 +256,8 @@ func (p *PageSerdeSuite) TestFailLargePageHeaders() {
p.pageReader.SetMaxPageHeaderSize(smallerMaxSize)
p.NotPanics(func() { p.False(p.pageReader.Next()) })
p.Error(p.pageReader.Err())
+ p.NoError(p.pageReader.Close())
+ p.alloc.AssertSize(p.T(), 0)
}
func (p *PageSerdeSuite) TestCompression() {
@@ -290,6 +301,8 @@ func (p *PageSerdeSuite) TestCompression() {
p.IsType(&file.DataPageV1{}, page)
p.Equal(data, page.Data())
}
+ p.pageReader.Close()
+ p.alloc.AssertSize(p.T(), 0)
p.ResetStream()
})
}
diff --git a/parquet/file/file_writer_test.go b/parquet/file/file_writer_test.go
index ec1f7eb5..05522980 100644
--- a/parquet/file/file_writer_test.go
+++ b/parquet/file/file_writer_test.go
@@ -714,6 +714,7 @@ func (t *PageIndexRoundTripSuite)
readPageIndexes(expectNumRG, expectNumPages in
}
}
+ t.Require().NoError(pgRdr.Close())
t.Require().NoError(pgRdr.Err())
}
}
diff --git a/parquet/file/page_reader.go b/parquet/file/page_reader.go
index 92cc2f0e..1ba7ecbe 100644
--- a/parquet/file/page_reader.go
+++ b/parquet/file/page_reader.go
@@ -53,6 +53,8 @@ type PageReader interface {
// Get the dictionary page for this column chunk
GetDictionaryPage() (*DictionaryPage, error)
SeekToPageWithRow(rowIdx int64) error
+ // Close releases the resources held by the reader.
+ Close() error
}
type PageType = format.PageType
@@ -369,14 +371,29 @@ type serializedPageReader struct {
baseOffset, dataOffset, dictOffset int64
- decompressBuffer bytes.Buffer
+ decompressBuffer *memory.Buffer
+ dataPageBuffer *memory.Buffer
+ dictPageBuffer *memory.Buffer
err error
}
+func (p *serializedPageReader) Close() error {
+ if p.decompressBuffer != nil {
+ p.decompressBuffer.Release()
+ p.dictPageBuffer.Release()
+ p.dataPageBuffer.Release()
+ }
+ return nil
+}
+
func (p *serializedPageReader) init(compressType compress.Compression, ctx
*CryptoContext) error {
if p.mem == nil {
p.mem = memory.NewGoAllocator()
}
+ p.decompressBuffer = memory.NewResizableBuffer(p.mem)
+ p.dataPageBuffer = memory.NewResizableBuffer(p.mem)
+ p.dictPageBuffer = memory.NewResizableBuffer(p.mem)
+ p.decompressBuffer.ResizeNoShrink(defaultPageHeaderSize)
codec, err := compress.GetCodec(compressType)
if err != nil {
@@ -384,10 +401,6 @@ func (p *serializedPageReader) init(compressType
compress.Compression, ctx *Cryp
}
p.codec = codec
- if p.decompressBuffer.Cap() < defaultPageHeaderSize {
- p.decompressBuffer.Grow(defaultPageHeaderSize -
p.decompressBuffer.Cap())
- }
-
if ctx != nil {
p.cryptoCtx = *ctx
p.initDecryption()
@@ -423,8 +436,12 @@ func NewPageReader(r parquet.BufferedReader, nrows int64,
compressType compress.
nrows: nrows,
mem: mem,
codec: codec,
+
+ decompressBuffer: memory.NewResizableBuffer(mem),
+ dataPageBuffer: memory.NewResizableBuffer(mem),
+ dictPageBuffer: memory.NewResizableBuffer(mem),
}
- rdr.decompressBuffer.Grow(defaultPageHeaderSize)
+ rdr.decompressBuffer.ResizeNoShrink(defaultPageHeaderSize)
if ctx != nil {
rdr.cryptoCtx = *ctx
rdr.initDecryption()
@@ -441,7 +458,6 @@ func (p *serializedPageReader) Reset(r
parquet.BufferedReader, nrows int64, comp
if p.err != nil {
return
}
- p.decompressBuffer.Reset()
if ctx != nil {
p.cryptoCtx = *ctx
p.initDecryption()
@@ -485,8 +501,9 @@ func (p *serializedPageReader) Page() Page {
}
func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf
[]byte) ([]byte, error) {
- p.decompressBuffer.Grow(lenCompressed)
- if _, err := io.CopyN(&p.decompressBuffer, rd, int64(lenCompressed));
err != nil {
+ p.decompressBuffer.ResizeNoShrink(lenCompressed)
+ b := bytes.NewBuffer(p.decompressBuffer.Bytes()[:0])
+ if _, err := io.CopyN(b, rd, int64(lenCompressed)); err != nil {
return nil, err
}
@@ -544,7 +561,6 @@ func (p *serializedPageReader) GetDictionaryPage()
(*DictionaryPage, error) {
}
p.cryptoCtx.StartDecryptWithDictionaryPage = true
- p.decompressBuffer.Reset()
if p.cryptoCtx.DataDecryptor != nil {
p.updateDecryption(p.cryptoCtx.DataDecryptor,
encryption.DictPageModule, p.dataPageAad)
}
@@ -560,9 +576,8 @@ func (p *serializedPageReader) GetDictionaryPage()
(*DictionaryPage, error) {
return nil, errors.New("parquet: invalid page header
(negative number of values)")
}
- buf := memory.NewResizableBuffer(p.mem)
- defer buf.Release()
- buf.ResizeNoShrink(lenUncompressed)
+ p.dictPageBuffer.ResizeNoShrink(lenUncompressed)
+ buf := memory.NewBufferBytes(p.dictPageBuffer.Bytes())
data, err := p.decompress(rd, lenCompressed, buf.Bytes())
if err != nil {
@@ -574,7 +589,7 @@ func (p *serializedPageReader) GetDictionaryPage()
(*DictionaryPage, error) {
return &DictionaryPage{
page: page{
- buf: memory.NewBufferBytes(data),
+ buf: buf,
typ: hdr.Type,
nvals: dictHeader.GetNumValues(),
encoding: dictHeader.GetEncoding(),
@@ -682,7 +697,6 @@ func (p *serializedPageReader) Next() bool {
p.err = nil
for p.rowsSeen < p.nrows {
- p.decompressBuffer.Reset()
if err := p.readPageHeader(p.r, p.curPageHdr); err != nil {
if err != io.EOF {
p.err = err
@@ -702,10 +716,6 @@ func (p *serializedPageReader) Next() bool {
p.updateDecryption(p.cryptoCtx.DataDecryptor,
encryption.DictPageModule, p.dataPageAad)
}
- buf := memory.NewResizableBuffer(p.mem)
- defer buf.Release()
- buf.ResizeNoShrink(lenUncompressed)
-
switch p.curPageHdr.GetType() {
case format.PageType_DICTIONARY_PAGE:
p.cryptoCtx.StartDecryptWithDictionaryPage = false
@@ -715,6 +725,9 @@ func (p *serializedPageReader) Next() bool {
return false
}
+ p.dictPageBuffer.ResizeNoShrink(lenUncompressed)
+ buf := memory.NewBufferBytes(p.dictPageBuffer.Bytes())
+
data, err := p.decompress(p.r, lenCompressed,
buf.Bytes())
if err != nil {
p.err = err
@@ -728,7 +741,7 @@ func (p *serializedPageReader) Next() bool {
// make dictionary page
p.curPage = &DictionaryPage{
page: page{
- buf: memory.NewBufferBytes(data),
+ buf: buf,
typ: p.curPageHdr.Type,
nvals: dictHeader.GetNumValues(),
encoding: dictHeader.GetEncoding(),
@@ -744,6 +757,9 @@ func (p *serializedPageReader) Next() bool {
return false
}
+ p.dataPageBuffer.ResizeNoShrink(lenUncompressed)
+ buf := memory.NewBufferBytes(p.dataPageBuffer.Bytes())
+
firstRowIdx := p.rowsSeen
p.rowsSeen += int64(dataHeader.GetNumValues())
data, err := p.decompress(p.r, lenCompressed,
buf.Bytes())
@@ -759,7 +775,7 @@ func (p *serializedPageReader) Next() bool {
// make datapagev1
p.curPage = &DataPageV1{
page: page{
- buf: memory.NewBufferBytes(data),
+ buf: buf,
typ: p.curPageHdr.Type,
nvals: dataHeader.GetNumValues(),
encoding: dataHeader.GetEncoding(),
@@ -783,6 +799,9 @@ func (p *serializedPageReader) Next() bool {
return false
}
+ p.dataPageBuffer.ResizeNoShrink(lenUncompressed)
+ buf := memory.NewBufferBytes(p.dataPageBuffer.Bytes())
+
compressed := dataHeader.GetIsCompressed()
// extract stats
firstRowIdx := p.rowsSeen
@@ -803,7 +822,6 @@ func (p *serializedPageReader) Next() bool {
} else {
io.ReadFull(p.r, buf.Bytes())
}
- buf.Retain()
if buf.Len() != lenUncompressed {
p.err = fmt.Errorf("parquet: metadata said %d
bytes uncompressed data page, got %d bytes", lenUncompressed, buf.Len())
diff --git a/parquet/file/record_reader.go b/parquet/file/record_reader.go
index a21e0665..2a7aba90 100644
--- a/parquet/file/record_reader.go
+++ b/parquet/file/record_reader.go
@@ -368,6 +368,7 @@ func (rr *recordReader) Release() {
rr.defLevels.Release()
rr.repLevels.Release()
rr.defLevels, rr.repLevels = nil, nil
+ rr.Close()
}
}
diff --git a/parquet/internal/encoding/physical_types.tmpldata
b/parquet/internal/encoding/physical_types.tmpldata
index 0adeb995..b5907266 100644
--- a/parquet/internal/encoding/physical_types.tmpldata
+++ b/parquet/internal/encoding/physical_types.tmpldata
@@ -3,50 +3,58 @@
"Name": "Int32",
"name": "int32",
"lower": "int32",
- "prefix": "arrow"
+ "prefix": "arrow",
+ "isByteArray": false
},
{
"Name": "Int64",
"name": "int64",
"lower": "int64",
- "prefix": "arrow"
+ "prefix": "arrow",
+ "isByteArray": false
},
{
"Name": "Int96",
"name": "parquet.Int96",
"lower": "int96",
- "prefix": "parquet"
+ "prefix": "parquet",
+ "isByteArray": false
},
{
"Name": "Float32",
"name": "float32",
"lower": "float32",
"prefix": "arrow",
- "physical": "Float"
+ "physical": "Float",
+ "isByteArray": false
},
{
"Name": "Float64",
"name": "float64",
"lower": "float64",
"prefix": "arrow",
- "physical": "Double"
+ "physical": "Double",
+ "isByteArray": false
},
{
"Name": "Boolean",
"name": "bool",
"lower": "bool",
- "prefix": "arrow"
+ "prefix": "arrow",
+ "isByteArray": false
},
{
"Name": "ByteArray",
"name": "parquet.ByteArray",
"lower": "byteArray",
- "prefix": "parquet"
+ "prefix": "parquet",
+ "isByteArray": true
},
{
"Name": "FixedLenByteArray",
"name": "parquet.FixedLenByteArray",
"lower": "fixedLenByteArray",
- "prefix": "parquet"
+ "prefix": "parquet",
+ "isByteArray": true
}
]
diff --git a/parquet/internal/testutils/pagebuilder.go
b/parquet/internal/testutils/pagebuilder.go
index 41c2ba6f..95ddf57f 100644
--- a/parquet/internal/testutils/pagebuilder.go
+++ b/parquet/internal/testutils/pagebuilder.go
@@ -234,6 +234,10 @@ type MockPageReader struct {
curpage int
}
+func (m *MockPageReader) Close() error {
+ return nil
+}
+
func (m *MockPageReader) Err() error {
return m.Called().Error(0)
}
diff --git a/parquet/internal/testutils/primitive_typed.go
b/parquet/internal/testutils/primitive_typed.go
index 927ad63a..3f281f32 100644
--- a/parquet/internal/testutils/primitive_typed.go
+++ b/parquet/internal/testutils/primitive_typed.go
@@ -38,9 +38,10 @@ type PrimitiveTypedTest struct {
Buffer *memory.Buffer
Values interface{}
- ValuesOut interface{}
- DefLevelsOut []int16
- RepLevelsOut []int16
+ ValuesOut interface{}
+ ValuesOutPage interface{}
+ DefLevelsOut []int16
+ RepLevelsOut []int16
}
func NewPrimitiveTypedTest(typ reflect.Type) PrimitiveTypedTest {
@@ -49,6 +50,7 @@ func NewPrimitiveTypedTest(typ reflect.Type)
PrimitiveTypedTest {
func (p *PrimitiveTypedTest) SetupValuesOut(nvalues int64) {
p.ValuesOut = reflect.MakeSlice(reflect.SliceOf(p.Typ), int(nvalues),
int(nvalues)).Interface()
+ p.ValuesOutPage = reflect.MakeSlice(reflect.SliceOf(p.Typ),
int(nvalues), int(nvalues)).Interface()
p.DefLevelsOut = make([]int16, nvalues)
p.RepLevelsOut = make([]int16, nvalues)
}
@@ -252,6 +254,37 @@ func (p *PrimitiveTypedTest) ReadBatch(reader
file.ColumnChunkReader, batch, val
}
}
+func (p *PrimitiveTypedTest) ReadBatchInPage(reader file.ColumnChunkReader,
batch int64, defLevels, repLevels []int16) int64 {
+ switch r := reader.(type) {
+ case *file.Int32ColumnChunkReader:
+ _, read, _ := r.ReadBatchInPage(batch,
p.ValuesOutPage.([]int32), defLevels, repLevels)
+ return int64(read)
+ case *file.Int64ColumnChunkReader:
+ _, read, _ := r.ReadBatchInPage(batch,
p.ValuesOutPage.([]int64), defLevels, repLevels)
+ return int64(read)
+ case *file.Float32ColumnChunkReader:
+ _, read, _ := r.ReadBatchInPage(batch,
p.ValuesOutPage.([]float32), defLevels, repLevels)
+ return int64(read)
+ case *file.Float64ColumnChunkReader:
+ _, read, _ := r.ReadBatchInPage(batch,
p.ValuesOutPage.([]float64), defLevels, repLevels)
+ return int64(read)
+ case *file.Int96ColumnChunkReader:
+ _, read, _ := r.ReadBatchInPage(batch,
p.ValuesOutPage.([]parquet.Int96), defLevels, repLevels)
+ return int64(read)
+ case *file.ByteArrayColumnChunkReader:
+ _, read, _ := r.ReadBatchInPage(batch,
p.ValuesOutPage.([]parquet.ByteArray), defLevels, repLevels)
+ return int64(read)
+ case *file.BooleanColumnChunkReader:
+ _, read, _ := r.ReadBatchInPage(batch,
p.ValuesOutPage.([]bool), defLevels, repLevels)
+ return int64(read)
+ case *file.FixedLenByteArrayColumnChunkReader:
+ _, read, _ := r.ReadBatchInPage(batch,
p.ValuesOutPage.([]parquet.FixedLenByteArray), defLevels, repLevels)
+ return int64(read)
+ default:
+ panic("unimplemented")
+ }
+}
+
func Min(v1, v2 interface{}) interface{} {
switch n1 := v1.(type) {
case int32:
diff --git a/parquet/pqarrow/encode_dictionary_test.go
b/parquet/pqarrow/encode_dictionary_test.go
index ac6d4e50..e47b7e6b 100644
--- a/parquet/pqarrow/encode_dictionary_test.go
+++ b/parquet/pqarrow/encode_dictionary_test.go
@@ -290,6 +290,7 @@ func (ad *ArrowWriteDictionarySuite)
TestStatisticsWithFallback() {
}
ad.False(pr.Next())
+ ad.NoError(pr.Close())
}
})
}