This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 6dc6926 feat(parquet/file): Add SeekToRow for Column readers (#283)
6dc6926 is described below
commit 6dc6926acfe6e70335511f2df67f57006f8bddec
Author: Matt Topol <[email protected]>
AuthorDate: Thu Feb 20 11:39:10 2025 -0500
feat(parquet/file): Add SeekToRow for Column readers (#283)
### Rationale for this change
Addressing the comments in
https://github.com/apache/arrow-go/issues/278#issuecomment-2657578697 to
allow for optimizing reads by skipping entire pages and leveraging the
offset index if it exists.
### What changes are included in this PR?
Deprecating the old `NewColumnChunkReader` and `NewPageReader` methods
as they really aren't safe to use outside of the package, and have
proved difficult to evolve without breaking changes. Instead users
should rely on using the `RowGroupReader` to perform the creation of the
column readers and page readers, which is generally what is done by
consumers already.
Adding `SeekToRow` method on the ColumnChunkReader to allow skipping to
a particular row in the column chunk (which also allows quickly
resetting back to the beginning of a column!) along with
`SeekToPageWithRow` method on the page reader. Also updates the `Skip`
method to properly skip *rows* in a repeated column, not just values.
### Are these changes tested?
Yes, tests are included.
### Are there any user-facing changes?
Just the new methods. The deprecated methods are not removed currently.
---
internal/utils/buf_reader.go | 26 ++-
parquet/file/column_reader.go | 274 +++++++++++++++++++++------
parquet/file/column_reader_test.go | 175 ++++++++++++++++-
parquet/file/column_reader_types.gen.go | 83 ++------
parquet/file/column_reader_types.gen.go.tmpl | 16 +-
parquet/file/file_reader.go | 32 ++--
parquet/file/file_reader_mmap_windows.go | 2 +-
parquet/file/page_reader.go | 241 +++++++++++++++++++----
parquet/file/record_reader.go | 16 +-
parquet/file/row_group_reader.go | 57 +++++-
parquet/internal/testutils/pagebuilder.go | 40 +++-
parquet/metadata/page_index.go | 76 +++++---
parquet/reader_properties.go | 3 +
13 files changed, 790 insertions(+), 251 deletions(-)
diff --git a/internal/utils/buf_reader.go b/internal/utils/buf_reader.go
index 0b2381d..2a9aa9d 100644
--- a/internal/utils/buf_reader.go
+++ b/internal/utils/buf_reader.go
@@ -23,6 +23,11 @@ import (
"io"
)
+type Reader interface {
+ io.ReadSeeker
+ io.ReaderAt
+}
+
// bufferedReader is similar to bufio.Reader except
// it will expand the buffer if necessary when asked to Peek
// more bytes than are in the buffer
@@ -30,21 +35,14 @@ type bufferedReader struct {
bufferSz int
buf []byte
r, w int
- rd io.Reader
+ rd Reader
err error
}
// NewBufferedReader returns a buffered reader with similar semantics to
bufio.Reader
// except Peek will expand the internal buffer if needed rather than return
// an error.
-func NewBufferedReader(rd io.Reader, sz int) *bufferedReader {
- // if rd is already a buffered reader whose buffer is >= the requested
size
- // then just return it as is. no need to make a new object.
- b, ok := rd.(*bufferedReader)
- if ok && len(b.buf) >= sz {
- return b
- }
-
+func NewBufferedReader(rd Reader, sz int) *bufferedReader {
r := &bufferedReader{
rd: rd,
}
@@ -52,6 +50,14 @@ func NewBufferedReader(rd io.Reader, sz int) *bufferedReader
{
return r
}
+func (b *bufferedReader) Outer() Reader { return b.rd }
+
+func (b *bufferedReader) Reset(rd Reader) {
+ b.resetBuffer()
+ b.rd = rd
+ b.r, b.w = 0, 0
+}
+
func (b *bufferedReader) resetBuffer() {
if b.buf == nil {
b.buf = make([]byte, b.bufferSz)
@@ -97,6 +103,8 @@ func (b *bufferedReader) readErr() error {
return err
}
+func (b *bufferedReader) BufferSize() int { return b.bufferSz }
+
// Buffered returns the number of bytes currently buffered
func (b *bufferedReader) Buffered() int { return b.w - b.r }
diff --git a/parquet/file/column_reader.go b/parquet/file/column_reader.go
index 0fb4ed1..5faf8bc 100644
--- a/parquet/file/column_reader.go
+++ b/parquet/file/column_reader.go
@@ -87,6 +87,9 @@ type ColumnChunkReader interface {
// it encountered. Otherwise this will be nil if it's just the end of
the
// column
Err() error
+
+ SeekToRow(rowIdx int64) error
+
// Skip buffered values
consumeBufferedValues(int64)
// number of available buffered values that have not been decoded yet
@@ -113,7 +116,8 @@ type ColumnChunkReader interface {
}
type columnChunkReader struct {
- descr *schema.Column
+ descr *schema.Column
+
rdr PageReader
repetitionDecoder encoding.LevelDecoder
definitionDecoder encoding.LevelDecoder
@@ -135,16 +139,50 @@ type columnChunkReader struct {
// is set when an error is encountered
err error
defLvlBuffer []int16
+ repLvlBuffer []int16
newDictionary bool
}
+func newTypedColumnChunkReader(base columnChunkReader) ColumnChunkReader {
+ switch base.descr.PhysicalType() {
+ case parquet.Types.FixedLenByteArray:
+ base.decoderTraits = &encoding.FixedLenByteArrayDecoderTraits
+ return &FixedLenByteArrayColumnChunkReader{base}
+ case parquet.Types.Float:
+ base.decoderTraits = &encoding.Float32DecoderTraits
+ return &Float32ColumnChunkReader{base}
+ case parquet.Types.Double:
+ base.decoderTraits = &encoding.Float64DecoderTraits
+ return &Float64ColumnChunkReader{base}
+ case parquet.Types.ByteArray:
+ base.decoderTraits = &encoding.ByteArrayDecoderTraits
+ return &ByteArrayColumnChunkReader{base}
+ case parquet.Types.Int32:
+ base.decoderTraits = &encoding.Int32DecoderTraits
+ return &Int32ColumnChunkReader{base}
+ case parquet.Types.Int64:
+ base.decoderTraits = &encoding.Int64DecoderTraits
+ return &Int64ColumnChunkReader{base}
+ case parquet.Types.Int96:
+ base.decoderTraits = &encoding.Int96DecoderTraits
+ return &Int96ColumnChunkReader{base}
+ case parquet.Types.Boolean:
+ base.decoderTraits = &encoding.BooleanDecoderTraits
+ return &BooleanColumnChunkReader{base}
+ }
+ return nil
+}
+
// NewColumnReader returns a column reader for the provided column initialized
with the given pagereader that will
// provide the pages of data for this column. The type is determined from the
column passed in.
//
// In addition to the page reader and allocator, a pointer to a shared
sync.Pool is expected to provide buffers for temporary
// usage to minimize allocations. The bufferPool should provide *memory.Buffer
objects that can be resized as necessary, buffers
// should have `ResizeNoShrink(0)` called on them before being put back into
the pool.
+//
+// Deprecated: This function will be removed from the public interface soon as
it is currently unsafe to use
+// outside of this package.
func NewColumnReader(descr *schema.Column, pageReader PageReader, mem
memory.Allocator, bufferPool *sync.Pool) ColumnChunkReader {
base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem,
decoders: make(map[format.Encoding]encoding.TypedDecoder), bufferPool:
bufferPool}
switch descr.PhysicalType() {
@@ -197,6 +235,15 @@ func (c *columnChunkReader) getDefLvlBuffer(sz int64)
[]int16 {
return c.defLvlBuffer[:sz]
}
+func (c *columnChunkReader) getRepLvlBuffer(sz int64) []int16 {
+ if int64(cap(c.repLvlBuffer)) < sz {
+ c.repLvlBuffer = make([]int16, sz)
+ return c.repLvlBuffer
+ }
+
+ return c.repLvlBuffer[:sz]
+}
+
// HasNext returns whether there is more data to be read in this column
// and row group.
func (c *columnChunkReader) HasNext() bool {
@@ -206,6 +253,23 @@ func (c *columnChunkReader) HasNext() bool {
return true
}
+func (c *columnChunkReader) readDictionary() error {
+ if c.newDictionary {
+ return nil
+ }
+
+ page, err := c.pager().GetDictionaryPage()
+ if err != nil {
+ return err
+ }
+
+ if page != nil {
+ return c.configureDict(page)
+ }
+
+ return nil
+}
+
func (c *columnChunkReader) configureDict(page *DictionaryPage) error {
enc := page.encoding
if enc == format.Encoding_PLAIN_DICTIONARY || enc ==
format.Encoding_PLAIN {
@@ -233,6 +297,30 @@ func (c *columnChunkReader) configureDict(page
*DictionaryPage) error {
return nil
}
+func (c *columnChunkReader) processPage() (bool, error) {
+ var (
+ err error
+ lvlByteLen int64
+ )
+ switch p := c.curPage.(type) {
+ case *DictionaryPage:
+ return false, c.configureDict(p)
+ case *DataPageV1:
+ lvlByteLen, err = c.initLevelDecodersV1(p, p.repLvlEncoding,
p.defLvlEncoding)
+ case *DataPageV2:
+ lvlByteLen, err = c.initLevelDecodersV2(p)
+ default:
+ // we can skip non-data pages
+ return false, nil
+ }
+
+ if err != nil {
+ return true, err
+ }
+
+ return true, c.initDataDecoder(c.curPage, lvlByteLen)
+}
+
// read a new page from the page reader
func (c *columnChunkReader) readNewPage() bool {
for c.rdr.Next() { // keep going until we get a data page
@@ -241,31 +329,15 @@ func (c *columnChunkReader) readNewPage() bool {
break
}
- var lvlByteLen int64
- switch p := c.curPage.(type) {
- case *DictionaryPage:
- if err := c.configureDict(p); err != nil {
- c.err = err
- return false
- }
- continue
- case *DataPageV1:
- lvlByteLen, c.err = c.initLevelDecodersV1(p,
p.repLvlEncoding, p.defLvlEncoding)
- if c.err != nil {
- return false
- }
- case *DataPageV2:
- lvlByteLen, c.err = c.initLevelDecodersV2(p)
- if c.err != nil {
- return false
- }
- default:
- // we can skip non-data pages
- continue
+ gotDataPage, err := c.processPage()
+ if err != nil {
+ c.err = err
+ return false
}
- c.err = c.initDataDecoder(c.curPage, lvlByteLen)
- return c.err == nil
+ if gotDataPage {
+ return true
+ }
}
c.err = c.rdr.Err()
return false
@@ -283,6 +355,9 @@ func (c *columnChunkReader) initLevelDecodersV2(page
*DataPageV2) (int64, error)
if c.descr.MaxRepetitionLevel() > 0 {
c.repetitionDecoder.SetDataV2(page.repLvlByteLen,
c.descr.MaxRepetitionLevel(), int(c.numBuffered), buf)
+ if c.repLvlBuffer != nil {
+ c.repLvlBuffer = c.repLvlBuffer[:0]
+ }
}
// ARROW-17453: Some writers will write repetition levels even when
// the max repetition level is 0, so we should respect the value
@@ -339,6 +414,11 @@ func (c *columnChunkReader) initDataDecoder(page Page,
lvlByteLen int64) error {
encoding := page.Encoding()
if isDictIndexEncoding(encoding) {
+ // if we're seeking or otherwise skipping pages, we may not
have read
+ // the dictionary page in yet, so let's ensure we got it if one
exists
+ if err := c.readDictionary(); err != nil {
+ return err
+ }
encoding = format.Encoding_RLE_DICTIONARY
}
@@ -396,6 +476,9 @@ func (c *columnChunkReader) readRepetitionLevels(levels
[]int16) int {
return 0
}
+ if len(c.repLvlBuffer) > 0 {
+ return copy(levels, c.repLvlBuffer[c.numDecoded:])
+ }
nlevels, _ := c.repetitionDecoder.Decode(levels)
return nlevels
}
@@ -436,49 +519,114 @@ func (c *columnChunkReader) determineNumToRead(batchLen
int64, defLvls, repLvls
return
}
-// skipValues some number of rows using readFn as the function to read the
data and throw it away.
-// If we can skipValues a whole page based on its metadata, then we do so,
otherwise we read the
-// page until we have skipped the number of rows desired.
-func (c *columnChunkReader) skipValues(nvalues int64, readFn func(batch int64,
buf []byte) (int64, error)) (int64, error) {
- var err error
- toskip := nvalues
- for c.HasNext() && toskip > 0 {
- // if number to skip is more than the number of undecoded
values, skip the page
- if toskip > (c.numBuffered - c.numDecoded) {
- toskip -= c.numBuffered - c.numDecoded
- c.numDecoded = c.numBuffered
- } else {
- var (
- batchSize int64 = 1024
- valsRead int64 = 0
- )
-
- scratch := c.bufferPool.Get().(*memory.Buffer)
- defer func() {
- scratch.ResizeNoShrink(0)
- c.bufferPool.Put(scratch)
- }()
- bufMult := 1
- if c.descr.PhysicalType() == parquet.Types.Boolean {
- // for bools, BytesRequired returns 1 byte per
8 bool, but casting []byte to []bool requires 1 byte per 1 bool
- bufMult = 8
+// SeekToRow will seek to the row index provided in the column chunk. If
+// the metadata contains an OffsetIndex for skipping pages based on row indexes
+// then the pager will use that to skip to the correct page.
+//
+// If there is no OffsetIndex, then the pager will read each page until it
+// finds the page that contains the desired row index, and the Column Chunk
+// reader will discard values until it reaches the desired row index according
+// to the definition and repetition levels.
+func (c *columnChunkReader) SeekToRow(rowIdx int64) error {
+ if err := c.pager().SeekToPageWithRow(rowIdx); err != nil {
+ return err
+ }
+
+ c.numBuffered, c.numDecoded = 0, 0
+ c.curPage = c.rdr.Page()
+ if c.curPage == nil {
+ c.err = c.rdr.Err()
+ return c.err
+ }
+
+ gotDataPage, err := c.processPage()
+ if err != nil {
+ c.err = err
+ return err
+ }
+
+ if !gotDataPage {
+ c.readNewPage()
+ }
+
+ return c.skipRows(rowIdx - c.curPage.(DataPage).FirstRowIndex())
+}
+
+func (c *columnChunkReader) skipRows(nrows int64) error {
+ toSkip := nrows
+ for c.HasNext() && toSkip > 0 {
+ // if there are no repetition levels, then this is easy! each
level
+ // is one row so we just use the definition levels to determine
+ // the number of physical values to discard!
+ if c.descr.MaxRepetitionLevel() == 0 {
+ if toSkip >= (c.numBuffered - c.numDecoded) {
+ toSkip -= c.numBuffered - c.numDecoded
+ c.numDecoded = c.numBuffered
+ continue
+ }
+
+ ndefs, nvals, err := c.determineNumToRead(toSkip, nil,
nil)
+ if err != nil {
+ c.err = err
+ return err
+ }
+
+ skipped, err := c.curDecoder.Discard(int(nvals))
+ if err != nil {
+ c.err = err
+ return err
}
-
scratch.Reserve(c.decoderTraits.BytesRequired(int(batchSize) * bufMult))
-
- for {
- batchSize = utils.Min(batchSize, toskip)
- valsRead, err = readFn(batchSize, scratch.Buf())
- toskip -= valsRead
- if valsRead <= 0 || toskip <= 0 || err != nil {
- break
+
+ skipped = max(ndefs, skipped)
+
+ toSkip -= int64(skipped)
+ c.consumeBufferedValues(int64(skipped))
+ } else {
+ // with repetition levels, we have to check them to
determine
+ // how many rows to skip. we can't just skip the number
of values
+ // because there could be multiple values per row. So
we read in
+ // the repetition levels for the entire page at once
and then go
+ // through them to find the right row.
+ repLvls := c.getRepLvlBuffer(c.numBuffered)
+ nreps, _ := c.repetitionDecoder.Decode(repLvls)
+
+ rowsSkipped := int64(0)
+ levelsToSkip := -1
+ for i, def := range repLvls[:nreps] {
+ if def == 0 {
+ if rowsSkipped == toSkip {
+ levelsToSkip = i
+ break
+ }
+ rowsSkipped++
}
}
+
+ if levelsToSkip == -1 {
+ toSkip -= rowsSkipped
+ c.numBuffered, c.numDecoded = 0, 0
+ continue
+ }
+
+ var valuesToSkip int64
+ if c.descr.MaxDefinitionLevel() > 0 {
+ defLvls :=
c.getDefLvlBuffer(int64(levelsToSkip))
+ _, valuesToSkip =
c.readDefinitionLevels(defLvls)
+ } else {
+ valuesToSkip = int64(levelsToSkip)
+ }
+
+ skipped, err := c.curDecoder.Discard(int(valuesToSkip))
+ if err != nil {
+ c.err = err
+ return err
+ }
+
+ toSkip -= int64(skipped)
+ c.consumeBufferedValues(int64(levelsToSkip))
}
}
- if c.err != nil {
- err = c.err
- }
- return nvalues - toskip, err
+ return nil
}
type readerFunc func(int64, int64) (int, error)
@@ -498,7 +646,7 @@ func (c *columnChunkReader) readBatch(batchSize int64,
defLvls, repLvls []int16,
toRead int64
)
- for c.HasNext() && totalLvls < batchSize && err == nil {
+ for totalLvls < batchSize && c.HasNext() && err == nil {
if defLvls != nil {
defs = defLvls[totalLvls:]
}
diff --git a/parquet/file/column_reader_test.go
b/parquet/file/column_reader_test.go
index dd59955..f96156a 100644
--- a/parquet/file/column_reader_test.go
+++ b/parquet/file/column_reader_test.go
@@ -17,6 +17,8 @@
package file_test
import (
+ "bytes"
+ "fmt"
"math"
"math/rand"
"reflect"
@@ -24,13 +26,17 @@ import (
"sync"
"testing"
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/internal/utils"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/internal/testutils"
+ "github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/apache/arrow-go/v18/parquet/schema"
"github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
@@ -574,7 +580,7 @@ func (p *PrimitiveReaderSuite) TestDictionaryEncodedPages()
{
p.Run("Dict: Plain, Data: RLEDict", func() {
dictPage := file.NewDictionaryPage(dummy, 0,
parquet.Encodings.Plain)
- dataPage := testutils.MakeDataPage(p.dataPageVersion, descr,
nil, 0, parquet.Encodings.RLEDict, dummy, nil, nil, 0, 0)
+ dataPage := testutils.MakeDataPage(p.dataPageVersion, descr,
nil, 0, parquet.Encodings.RLEDict, dummy, nil, nil, 0, 0, 0)
p.pages = append(p.pages, dictPage, dataPage)
p.initReader(descr)
@@ -585,7 +591,7 @@ func (p *PrimitiveReaderSuite) TestDictionaryEncodedPages()
{
p.Run("Dict: Plain Dictionary, Data: Plain Dictionary", func() {
dictPage := file.NewDictionaryPage(dummy, 0,
parquet.Encodings.PlainDict)
- dataPage := testutils.MakeDataPage(p.dataPageVersion, descr,
nil, 0, parquet.Encodings.PlainDict, dummy, nil, nil, 0, 0)
+ dataPage := testutils.MakeDataPage(p.dataPageVersion, descr,
nil, 0, parquet.Encodings.PlainDict, dummy, nil, nil, 0, 0, 0)
p.pages = append(p.pages, dictPage, dataPage)
p.initReader(descr)
p.NotPanics(func() { p.reader.HasNext() })
@@ -594,7 +600,7 @@ func (p *PrimitiveReaderSuite) TestDictionaryEncodedPages()
{
})
p.Run("Panic if dict page not first", func() {
- dataPage := testutils.MakeDataPage(p.dataPageVersion, descr,
nil, 0, parquet.Encodings.RLEDict, dummy, nil, nil, 0, 0)
+ dataPage := testutils.MakeDataPage(p.dataPageVersion, descr,
nil, 0, parquet.Encodings.RLEDict, dummy, nil, nil, 0, 0, 0)
p.pages = append(p.pages, dataPage)
p.initReader(descr)
p.NotPanics(func() { p.False(p.reader.HasNext()) })
@@ -622,7 +628,7 @@ func (p *PrimitiveReaderSuite) TestDictionaryEncodedPages()
{
})
p.Run("Unsupported encoding", func() {
- dataPage := testutils.MakeDataPage(p.dataPageVersion, descr,
nil, 0, parquet.Encodings.DeltaByteArray, dummy, nil, nil, 0, 0)
+ dataPage := testutils.MakeDataPage(p.dataPageVersion, descr,
nil, 0, parquet.Encodings.DeltaByteArray, dummy, nil, nil, 0, 0, 0)
p.pages = append(p.pages, dataPage)
p.initReader(descr)
p.Panics(func() { p.reader.HasNext() })
@@ -633,6 +639,82 @@ func (p *PrimitiveReaderSuite)
TestDictionaryEncodedPages() {
p.pages = p.pages[:2]
}
+func (p *PrimitiveReaderSuite) TestSeekToRowRequired() {
+ const (
+ levelsPerPage int = 100
+ npages int = 50
+ )
+
+ for _, enc := range []parquet.Encoding{parquet.Encodings.Plain,
parquet.Encodings.RLEDict} {
+ p.maxDefLvl, p.maxRepLvl = 0, 0
+ typ := schema.NewInt32Node("a", parquet.Repetitions.Required,
-1)
+ d := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+
+ p.pages, p.nvalues, p.values, p.defLevels, p.repLevels =
makePages(p.dataPageVersion,
+ d, npages, levelsPerPage, reflect.TypeOf(int32(0)), enc)
+ p.nlevels = npages * levelsPerPage
+ p.initReader(d)
+
+ p.pager.(*testutils.MockPageReader).TestData().Set("row_map",
func(idx int64) int {
+ return (int(idx) / levelsPerPage) + 1
+ })
+
+ // check seek back to beginning
+ p.checkResults(reflect.TypeOf(int32(0)))
+ p.Require().NoError(p.reader.SeekToRow(0))
+ p.checkResults(reflect.TypeOf(int32(0)))
+
+ p.Require().NoError(p.reader.SeekToRow(550))
+ p.nvalues -= 550
+ p.nlevels -= 550
+ p.values = p.values.Slice(550, p.values.Len())
+ p.checkResults(reflect.TypeOf(int32(0)))
+ p.clear()
+ }
+}
+
+func (p *PrimitiveReaderSuite) TestSeekToRowOptional() {
+ const (
+ levelsPerPage int = 100
+ npages int = 50
+ )
+
+ for _, enc := range []parquet.Encoding{parquet.Encodings.Plain,
parquet.Encodings.RLEDict} {
+ p.maxDefLvl, p.maxRepLvl = 4, 0
+ typ := schema.NewInt32Node("a", parquet.Repetitions.Optional,
-1)
+ d := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+
+ p.pages, p.nvalues, p.values, p.defLevels, p.repLevels =
makePages(p.dataPageVersion,
+ d, npages, levelsPerPage, reflect.TypeOf(int32(0)), enc)
+ p.nlevels = npages * levelsPerPage
+ p.initReader(d)
+
+ p.pager.(*testutils.MockPageReader).TestData().Set("row_map",
func(idx int64) int {
+ return (int(idx) / levelsPerPage) + 1
+ })
+
+ // check seek back to beginning
+ p.checkResults(reflect.TypeOf(int32(0)))
+ p.Require().NoError(p.reader.SeekToRow(0))
+ p.checkResults(reflect.TypeOf(int32(0)))
+
+ p.Require().NoError(p.reader.SeekToRow(550))
+ realValuesSkipped := 0
+ for i := 0; i < 550; i++ {
+ if p.defLevels[i] == p.maxDefLvl {
+ realValuesSkipped++
+ }
+ }
+
+ p.nvalues -= realValuesSkipped
+ p.values = p.values.Slice(realValuesSkipped, p.values.Len())
+ p.nlevels -= 550
+ p.defLevels = p.defLevels[550:]
+ p.checkResults(reflect.TypeOf(int32(0)))
+ p.clear()
+ }
+}
+
func TestPrimitiveReader(t *testing.T) {
t.Parallel()
t.Run("datapage v1", func(t *testing.T) {
@@ -642,3 +724,88 @@ func TestPrimitiveReader(t *testing.T) {
suite.Run(t, &PrimitiveReaderSuite{dataPageVersion:
parquet.DataPageV2})
})
}
+
+func TestFullSeekRow(t *testing.T) {
+ mem := memory.DefaultAllocator
+
+ for _, dataPageVersion := range
[]parquet.DataPageVersion{parquet.DataPageV2, parquet.DataPageV1} {
+ t.Run(fmt.Sprintf("DataPageVersion=%v", dataPageVersion+1),
func(t *testing.T) {
+
+ props :=
parquet.NewWriterProperties(parquet.WithAllocator(mem),
+ parquet.WithDataPageVersion(dataPageVersion),
parquet.WithDataPageSize(1),
+ parquet.WithPageIndexEnabled(true))
+
+ sc := arrow.NewSchema([]arrow.Field{
+ {Name: "c0", Type: arrow.PrimitiveTypes.Int64,
Nullable: true},
+ {Name: "c1", Type: arrow.BinaryTypes.String,
Nullable: true},
+ {Name: "c2", Type:
arrow.ListOf(arrow.PrimitiveTypes.Int64), Nullable: true},
+ }, nil)
+
+ tbl, err := array.TableFromJSON(mem, sc, []string{`[
+ {"c0": 1, "c1": "a", "c2": [1]},
+ {"c0": 2, "c1": "b", "c2": [1, 2]},
+ {"c0": 3, "c1": "c", "c2": [null]},
+ {"c0": null, "c1": "d", "c2": []},
+ {"c0": 5, "c1": null, "c2": [3, 3, 3]},
+ {"c0": 6, "c1": "f", "c2": null}
+ ]`})
+ require.NoError(t, err)
+ defer tbl.Release()
+
+ schema := tbl.Schema()
+ arrWriterProps :=
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))
+
+ var buf bytes.Buffer
+ wr, err := pqarrow.NewFileWriter(schema, &buf, props,
arrWriterProps)
+ require.NoError(t, err)
+
+ require.NoError(t, wr.WriteTable(tbl, tbl.NumRows()))
+ require.NoError(t, wr.Close())
+
+ rdr, err :=
file.NewParquetReader(bytes.NewReader(buf.Bytes()),
+
file.WithReadProps(parquet.NewReaderProperties(mem)))
+ require.NoError(t, err)
+ defer rdr.Close()
+
+ rgr := rdr.RowGroup(0)
+ col, err := rgr.Column(0)
+ require.NoError(t, err)
+
+ icr := col.(*file.Int64ColumnChunkReader)
+ require.NoError(t, icr.SeekToRow(3))
+
+ vals := make([]int64, 5)
+ defLvls := make([]int16, 5)
+ repLvls := make([]int16, 5)
+
+ total, read, err := icr.ReadBatch(5, vals, defLvls,
repLvls)
+ require.NoError(t, err)
+
+ assert.EqualValues(t, 3, total)
+ assert.EqualValues(t, 2, read)
+
+ assert.Equal(t, []int64{5, 6}, vals[:read])
+ assert.Equal(t, []int16{0, 1, 1}, defLvls[:total])
+ assert.Equal(t, []int16{0, 0, 0}, repLvls[:total])
+
+ col2, err := rgr.Column(2)
+ require.NoError(t, err)
+
+ icr = col2.(*file.Int64ColumnChunkReader)
+ require.NoError(t, icr.SeekToRow(3))
+
+ total, read, err = icr.ReadBatch(5, vals, defLvls,
repLvls)
+ require.NoError(t, err)
+
+ // 5 definition levels are read for the last 3 rows
+ // because of the repetitions
+ assert.EqualValues(t, 5, total)
+ // only 3 physical values though
+ assert.EqualValues(t, 3, read)
+
+ assert.Equal(t, []int64{3, 3, 3}, vals[:read])
+ assert.Equal(t, []int16{1, 3, 3, 3, 0}, defLvls[:total])
+ assert.Equal(t, []int16{0, 0, 1, 1, 0}, repLvls[:total])
+ })
+ }
+}
diff --git a/parquet/file/column_reader_types.gen.go
b/parquet/file/column_reader_types.gen.go
index 08202e5..a60cf8e 100644
--- a/parquet/file/column_reader_types.gen.go
+++ b/parquet/file/column_reader_types.gen.go
@@ -19,9 +19,6 @@
package file
import (
- "unsafe"
-
- "github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/internal/encoding"
)
@@ -35,14 +32,8 @@ type Int32ColumnChunkReader struct {
// Skip skips the next nvalues so that the next call to ReadBatch
// will start reading *after* the skipped values.
func (cr *Int32ColumnChunkReader) Skip(nvalues int64) (int64, error) {
- return cr.columnChunkReader.skipValues(nvalues,
- func(batch int64, buf []byte) (int64, error) {
- vals, _, err := cr.ReadBatch(batch,
- arrow.Int32Traits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf))
- return vals, err
- })
+ err := cr.columnChunkReader.skipRows(nvalues)
+ return nvalues, err
}
// ReadBatch reads batchSize values from the column.
@@ -69,14 +60,8 @@ type Int64ColumnChunkReader struct {
// Skip skips the next nvalues so that the next call to ReadBatch
// will start reading *after* the skipped values.
func (cr *Int64ColumnChunkReader) Skip(nvalues int64) (int64, error) {
- return cr.columnChunkReader.skipValues(nvalues,
- func(batch int64, buf []byte) (int64, error) {
- vals, _, err := cr.ReadBatch(batch,
- arrow.Int64Traits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf))
- return vals, err
- })
+ err := cr.columnChunkReader.skipRows(nvalues)
+ return nvalues, err
}
// ReadBatch reads batchSize values from the column.
@@ -103,14 +88,8 @@ type Int96ColumnChunkReader struct {
// Skip skips the next nvalues so that the next call to ReadBatch
// will start reading *after* the skipped values.
func (cr *Int96ColumnChunkReader) Skip(nvalues int64) (int64, error) {
- return cr.columnChunkReader.skipValues(nvalues,
- func(batch int64, buf []byte) (int64, error) {
- vals, _, err := cr.ReadBatch(batch,
- parquet.Int96Traits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf))
- return vals, err
- })
+ err := cr.columnChunkReader.skipRows(nvalues)
+ return nvalues, err
}
// ReadBatch reads batchSize values from the column.
@@ -137,14 +116,8 @@ type Float32ColumnChunkReader struct {
// Skip skips the next nvalues so that the next call to ReadBatch
// will start reading *after* the skipped values.
func (cr *Float32ColumnChunkReader) Skip(nvalues int64) (int64, error) {
- return cr.columnChunkReader.skipValues(nvalues,
- func(batch int64, buf []byte) (int64, error) {
- vals, _, err := cr.ReadBatch(batch,
- arrow.Float32Traits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf))
- return vals, err
- })
+ err := cr.columnChunkReader.skipRows(nvalues)
+ return nvalues, err
}
// ReadBatch reads batchSize values from the column.
@@ -171,14 +144,8 @@ type Float64ColumnChunkReader struct {
// Skip skips the next nvalues so that the next call to ReadBatch
// will start reading *after* the skipped values.
func (cr *Float64ColumnChunkReader) Skip(nvalues int64) (int64, error) {
- return cr.columnChunkReader.skipValues(nvalues,
- func(batch int64, buf []byte) (int64, error) {
- vals, _, err := cr.ReadBatch(batch,
- arrow.Float64Traits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf))
- return vals, err
- })
+ err := cr.columnChunkReader.skipRows(nvalues)
+ return nvalues, err
}
// ReadBatch reads batchSize values from the column.
@@ -205,14 +172,8 @@ type BooleanColumnChunkReader struct {
// Skip skips the next nvalues so that the next call to ReadBatch
// will start reading *after* the skipped values.
func (cr *BooleanColumnChunkReader) Skip(nvalues int64) (int64, error) {
- return cr.columnChunkReader.skipValues(nvalues,
- func(batch int64, buf []byte) (int64, error) {
- vals, _, err := cr.ReadBatch(batch,
- *(*[]bool)(unsafe.Pointer(&buf)),
- nil,
- nil)
- return vals, err
- })
+ err := cr.columnChunkReader.skipRows(nvalues)
+ return nvalues, err
}
// ReadBatch reads batchSize values from the column.
@@ -239,14 +200,8 @@ type ByteArrayColumnChunkReader struct {
// Skip skips the next nvalues so that the next call to ReadBatch
// will start reading *after* the skipped values.
func (cr *ByteArrayColumnChunkReader) Skip(nvalues int64) (int64, error) {
- return cr.columnChunkReader.skipValues(nvalues,
- func(batch int64, buf []byte) (int64, error) {
- vals, _, err := cr.ReadBatch(batch,
- parquet.ByteArrayTraits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf))
- return vals, err
- })
+ err := cr.columnChunkReader.skipRows(nvalues)
+ return nvalues, err
}
// ReadBatch reads batchSize values from the column.
@@ -273,14 +228,8 @@ type FixedLenByteArrayColumnChunkReader struct {
// Skip skips the next nvalues so that the next call to ReadBatch
// will start reading *after* the skipped values.
func (cr *FixedLenByteArrayColumnChunkReader) Skip(nvalues int64) (int64,
error) {
- return cr.columnChunkReader.skipValues(nvalues,
- func(batch int64, buf []byte) (int64, error) {
- vals, _, err := cr.ReadBatch(batch,
-
parquet.FixedLenByteArrayTraits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf))
- return vals, err
- })
+ err := cr.columnChunkReader.skipRows(nvalues)
+ return nvalues, err
}
// ReadBatch reads batchSize values from the column.
diff --git a/parquet/file/column_reader_types.gen.go.tmpl
b/parquet/file/column_reader_types.gen.go.tmpl
index 3b9963a..a9d5bc0 100644
--- a/parquet/file/column_reader_types.gen.go.tmpl
+++ b/parquet/file/column_reader_types.gen.go.tmpl
@@ -31,20 +31,8 @@ type {{.Name}}ColumnChunkReader struct {
// Skip skips the next nvalues so that the next call to ReadBatch
// will start reading *after* the skipped values.
func (cr *{{.Name}}ColumnChunkReader) Skip(nvalues int64) (int64, error) {
- return cr.columnChunkReader.skipValues(nvalues,
- func(batch int64, buf []byte) (int64, error) {
- vals, _, err := cr.ReadBatch(batch,
- {{- if ne .Name "Boolean"}}
- {{.prefix}}.{{.Name}}Traits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf),
- arrow.Int16Traits.CastFromBytes(buf))
- {{- else}}
- *(*[]bool)(unsafe.Pointer(&buf)),
- nil,
- nil)
- {{- end}}
- return vals, err
- })
+ err := cr.columnChunkReader.skipRows(nvalues)
+ return nvalues, err
}
// ReadBatch reads batchSize values from the column.
diff --git a/parquet/file/file_reader.go b/parquet/file/file_reader.go
index 7807ac6..ddccd79 100644
--- a/parquet/file/file_reader.go
+++ b/parquet/file/file_reader.go
@@ -120,7 +120,16 @@ func NewParquetReader(r parquet.ReaderAtSeeker, opts
...ReadOption) (*Reader, er
}
if f.metadata == nil {
- return f, f.parseMetaData()
+ if err := f.parseMetaData(); err != nil {
+ return nil, err
+ }
+ }
+
+ f.pageIndexReader = &metadata.PageIndexReader{
+ Input: f.r,
+ Props: f.props,
+ FileMetadata: f.metadata,
+ Decryptor: f.fileDecryptor,
}
return f, nil
@@ -305,23 +314,16 @@ func (f *Reader) RowGroup(i int) *RowGroupReader {
rg := f.metadata.RowGroups[i]
return &RowGroupReader{
- fileMetadata: f.metadata,
- rgMetadata: metadata.NewRowGroupMetaData(rg,
f.metadata.Schema, f.WriterVersion(), f.fileDecryptor),
- props: f.props,
- r: f.r,
- fileDecryptor: f.fileDecryptor,
- bufferPool: &f.bufferPool,
+ fileMetadata: f.metadata,
+ rgMetadata: metadata.NewRowGroupMetaData(rg,
f.metadata.Schema, f.WriterVersion(), f.fileDecryptor),
+ props: f.props,
+ r: f.r,
+ fileDecryptor: f.fileDecryptor,
+ bufferPool: &f.bufferPool,
+ pageIndexReader: f.pageIndexReader,
}
}
func (f *Reader) GetPageIndexReader() *metadata.PageIndexReader {
- if f.pageIndexReader == nil {
- f.pageIndexReader = &metadata.PageIndexReader{
- Input: f.r,
- Props: f.props,
- FileMetadata: f.metadata,
- Decryptor: f.fileDecryptor,
- }
- }
return f.pageIndexReader
}
diff --git a/parquet/file/file_reader_mmap_windows.go
b/parquet/file/file_reader_mmap_windows.go
index 512370f..92e9620 100644
--- a/parquet/file/file_reader_mmap_windows.go
+++ b/parquet/file/file_reader_mmap_windows.go
@@ -25,6 +25,6 @@ import (
"github.com/apache/arrow-go/v18/parquet"
)
-func mmapOpen(filename string) (parquet.ReaderAtSeeker, error) {
+func mmapOpen(_ string) (parquet.ReaderAtSeeker, error) {
return nil, errors.New("mmap not implemented on windows")
}
diff --git a/parquet/file/page_reader.go b/parquet/file/page_reader.go
index 2226b84..d3639dd 100644
--- a/parquet/file/page_reader.go
+++ b/parquet/file/page_reader.go
@@ -18,12 +18,15 @@ package file
import (
"bytes"
+ "errors"
"fmt"
"io"
+ "sort"
"sync"
"github.com/JohnCGriffin/overflow"
"github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/utils"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/compress"
"github.com/apache/arrow-go/v18/parquet/internal/encryption"
@@ -47,6 +50,10 @@ type PageReader interface {
Err() error
// Reset allows reusing a page reader
Reset(r parquet.BufferedReader, nrows int64, compressType
compress.Compression, ctx *CryptoContext)
+
+ // Get the dictionary page for this column chunk
+ GetDictionaryPage() (*DictionaryPage, error)
+ SeekToPageWithRow(rowIdx int64) error
}
type PageType = format.PageType
@@ -342,7 +349,11 @@ func (d *DictionaryPage) Release() {
func (d *DictionaryPage) IsSorted() bool { return d.sorted }
type serializedPageReader struct {
- r parquet.BufferedReader
+ r parquet.BufferedReader
+ chunk *metadata.ColumnChunkMetaData
+ colIdx int
+ pgIndexReader *metadata.RowGroupPageIndexReader
+
nrows int64
rowsSeen int64
mem memory.Allocator
@@ -357,11 +368,46 @@ type serializedPageReader struct {
dataPageAad string
dataPageHeaderAad string
+ baseOffset, dataOffset, dictOffset int64
+
decompressBuffer bytes.Buffer
err error
}
+func (p *serializedPageReader) init(compressType compress.Compression, ctx
*CryptoContext) error {
+ if p.mem == nil {
+ p.mem = memory.NewGoAllocator()
+ }
+
+ codec, err := compress.GetCodec(compressType)
+ if err != nil {
+ return err
+ }
+ p.codec = codec
+
+ if p.decompressBuffer.Cap() < defaultPageHeaderSize {
+ p.decompressBuffer.Grow(defaultPageHeaderSize -
p.decompressBuffer.Cap())
+ }
+
+ if ctx != nil {
+ p.cryptoCtx = *ctx
+ p.initDecryption()
+ }
+
+ p.baseOffset = p.chunk.DataPageOffset()
+ p.dataOffset = p.baseOffset
+ if p.chunk.HasDictionaryPage() && p.chunk.DictionaryPageOffset() > 0 {
+ p.baseOffset = p.chunk.DictionaryPageOffset()
+ p.dictOffset = p.baseOffset
+ }
+
+ return nil
+}
+
// NewPageReader returns a page reader for the data which can be read from the
provided reader and compression.
+//
+// Deprecated: This function isn't properly safe for public API use and should
not be utilized
+// anymore. It will be removed from the public interface soon to prevent usage
outside of this package.
func NewPageReader(r parquet.BufferedReader, nrows int64, compressType
compress.Compression, mem memory.Allocator, ctx *CryptoContext) (PageReader,
error) {
if mem == nil {
mem = memory.NewGoAllocator()
@@ -439,9 +485,9 @@ func (p *serializedPageReader) Page() Page {
return p.curPage
}
-func (p *serializedPageReader) decompress(lenCompressed int, buf []byte)
([]byte, error) {
+func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf
[]byte) ([]byte, error) {
p.decompressBuffer.Grow(lenCompressed)
- if _, err := io.CopyN(&p.decompressBuffer, p.r, int64(lenCompressed));
err != nil {
+ if _, err := io.CopyN(&p.decompressBuffer, rd, int64(lenCompressed));
err != nil {
return nil, err
}
@@ -482,6 +528,149 @@ func extractStats(dataHeader dataheader) (pageStats
metadata.EncodedStatistics)
return
}
+func (p *serializedPageReader) GetDictionaryPage() (*DictionaryPage, error) {
+ if p.dictOffset > 0 {
+ hdr := format.NewPageHeader()
+ rd := utils.NewBufferedReader(
+ io.NewSectionReader(p.r.Outer(),
p.dictOffset-p.baseOffset, p.dataOffset-p.baseOffset),
+ p.r.BufferSize())
+ if err := p.readPageHeader(rd, hdr); err != nil {
+ return nil, err
+ }
+
+ dictHeader := hdr.GetDictionaryPageHeader()
+ if dictHeader == nil {
+ return nil, errors.New("parquet: invalid dictionary
page header")
+ }
+
+ p.cryptoCtx.StartDecryptWithDictionaryPage = true
+ p.decompressBuffer.Reset()
+ if p.cryptoCtx.DataDecryptor != nil {
+ p.updateDecryption(p.cryptoCtx.DataDecryptor,
encryption.DictPageModule, p.dataPageAad)
+ }
+
+ lenCompressed := int(hdr.GetCompressedPageSize())
+ lenUncompressed := int(hdr.GetUncompressedPageSize())
+ if lenCompressed < 0 || lenUncompressed < 0 {
+ return nil, errors.New("parquet: invalid page header")
+ }
+
+ p.cryptoCtx.StartDecryptWithDictionaryPage = false
+ if dictHeader.GetNumValues() < 0 {
+ return nil, errors.New("parquet: invalid page header
(negative number of values)")
+ }
+
+ buf := memory.NewResizableBuffer(p.mem)
+ defer buf.Release()
+ buf.ResizeNoShrink(lenUncompressed)
+
+ data, err := p.decompress(rd, lenCompressed, buf.Bytes())
+ if err != nil {
+ return nil, err
+ }
+ if len(data) != lenUncompressed {
+ return nil, fmt.Errorf("parquet: metadata said %d bytes
uncompressed dictionary page, got %d bytes", lenUncompressed, len(data))
+ }
+
+ return &DictionaryPage{
+ page: page{
+ buf: memory.NewBufferBytes(data),
+ typ: hdr.Type,
+ nvals: dictHeader.GetNumValues(),
+ encoding: dictHeader.GetEncoding(),
+ },
+ sorted: dictHeader.IsSetIsSorted() &&
dictHeader.GetIsSorted(),
+ }, nil
+ }
+
+ return nil, nil
+}
+
+func (p *serializedPageReader) readPageHeader(rd parquet.BufferedReader, hdr
*format.PageHeader) error {
+ allowedPgSz := defaultPageHeaderSize
+ for {
+ view, err := rd.Peek(allowedPgSz)
+ if err != nil && err != io.EOF {
+ return err
+ }
+
+ if len(view) == 0 {
+ return io.EOF
+ }
+
+ extra := 0
+ if p.cryptoCtx.MetaDecryptor != nil {
+ p.updateDecryption(p.cryptoCtx.MetaDecryptor,
encryption.DictPageHeaderModule, p.dataPageHeaderAad)
+ view = p.cryptoCtx.MetaDecryptor.Decrypt(view)
+ extra = p.cryptoCtx.MetaDecryptor.CiphertextSizeDelta()
+ }
+
+ remaining, err := thrift.DeserializeThrift(hdr, view)
+ if err != nil {
+ allowedPgSz *= 2
+ if allowedPgSz > p.maxPageHeaderSize {
+ return errors.New("parquet: deserializing page
header failed")
+ }
+ continue
+ }
+
+ rd.Discard(len(view) - int(remaining) + extra)
+ break
+ }
+ return nil
+}
+
+func (p *serializedPageReader) SeekToPageWithRow(rowIdx int64) error {
+ if rowIdx < 0 || rowIdx >= p.nrows {
+ return fmt.Errorf("parquet: cannot seek column reader to row
index %d", rowIdx)
+ }
+
+ var (
+ oidx metadata.OffsetIndex
+ err error
+ )
+
+ if p.pgIndexReader != nil {
+ oidx, err = p.pgIndexReader.GetOffsetIndex(p.colIdx)
+ if err != nil {
+ return err
+ }
+ }
+
+ section := p.r.Outer()
+ if oidx == nil {
+ if _, err = section.Seek(p.dataOffset-p.baseOffset,
io.SeekStart); err != nil {
+ return err
+ }
+ p.r.Reset(section)
+
+ p.rowsSeen = 0
+ p.pageOrd = 0
+
+ for p.Next() && p.rowsSeen < rowIdx {
+ }
+ return p.err
+ }
+
+ pages := oidx.GetPageLocations()
+ index := sort.Search(len(pages), func(i int) bool {
+ return pages[i].FirstRowIndex > rowIdx
+ }) - 1
+
+ if index < 0 {
+ return fmt.Errorf("parquet: seek out of range")
+ }
+
+ if _, err = section.Seek(pages[index].GetOffset()-p.baseOffset,
io.SeekStart); err != nil {
+ return err
+ }
+
+ p.r.Reset(section)
+ p.rowsSeen, p.pageOrd = pages[index].FirstRowIndex, int16(index)
+ p.Next()
+ return p.err
+}
+
func (p *serializedPageReader) Next() bool {
// Loop here because there may be unhandled page types that we skip
until
// finding a page that we do know what to do with
@@ -493,44 +682,19 @@ func (p *serializedPageReader) Next() bool {
p.err = nil
for p.rowsSeen < p.nrows {
- allowedPgSz := defaultPageHeaderSize
p.decompressBuffer.Reset()
- for {
- view, err := p.r.Peek(allowedPgSz)
- if err != nil && err != io.EOF {
+ if err := p.readPageHeader(p.r, p.curPageHdr); err != nil {
+ if err != io.EOF {
p.err = err
- return false
}
- if len(view) == 0 {
- return false
- }
-
- extra := 0
- if p.cryptoCtx.MetaDecryptor != nil {
- p.updateDecryption(p.cryptoCtx.MetaDecryptor,
encryption.DictPageHeaderModule, p.dataPageHeaderAad)
- view = p.cryptoCtx.MetaDecryptor.Decrypt(view)
- extra =
p.cryptoCtx.MetaDecryptor.CiphertextSizeDelta()
- }
-
- remaining, err :=
thrift.DeserializeThrift(p.curPageHdr, view)
- if err != nil {
- allowedPgSz *= 2
- if allowedPgSz > p.maxPageHeaderSize {
- p.err = xerrors.New("parquet:
deserializing page header failed")
- return false
- }
- continue
- }
-
- p.r.Discard(len(view) - int(remaining) + extra)
- break
+ return false
}
lenCompressed := int(p.curPageHdr.GetCompressedPageSize())
lenUncompressed := int(p.curPageHdr.GetUncompressedPageSize())
if lenCompressed < 0 || lenUncompressed < 0 {
- p.err = xerrors.New("parquet: invalid page header")
+ p.err = errors.New("parquet: invalid page header")
return false
}
@@ -551,7 +715,7 @@ func (p *serializedPageReader) Next() bool {
return false
}
- data, err := p.decompress(lenCompressed, buf.Bytes())
+ data, err := p.decompress(p.r, lenCompressed,
buf.Bytes())
if err != nil {
p.err = err
return false
@@ -580,8 +744,9 @@ func (p *serializedPageReader) Next() bool {
return false
}
+ firstRowIdx := p.rowsSeen
p.rowsSeen += int64(dataHeader.GetNumValues())
- data, err := p.decompress(lenCompressed, buf.Bytes())
+ data, err := p.decompress(p.r, lenCompressed,
buf.Bytes())
if err != nil {
p.err = err
return false
@@ -603,6 +768,7 @@ func (p *serializedPageReader) Next() bool {
repLvlEncoding:
dataHeader.GetRepetitionLevelEncoding(),
uncompressedSize: int32(lenUncompressed),
statistics: extractStats(dataHeader),
+ firstRowIndex: firstRowIdx,
}
case format.PageType_DATA_PAGE_V2:
p.pageOrd++
@@ -619,7 +785,8 @@ func (p *serializedPageReader) Next() bool {
compressed := dataHeader.GetIsCompressed()
// extract stats
- p.rowsSeen += int64(dataHeader.GetNumValues())
+ firstRowIdx := p.rowsSeen
+ p.rowsSeen += int64(dataHeader.GetNumRows())
levelsBytelen, ok :=
overflow.Add(int(dataHeader.GetDefinitionLevelsByteLength()),
int(dataHeader.GetRepetitionLevelsByteLength()))
if !ok {
p.err = xerrors.New("parquet: levels size too
large (corrupt file?)")
@@ -630,7 +797,7 @@ func (p *serializedPageReader) Next() bool {
if levelsBytelen > 0 {
io.ReadFull(p.r,
buf.Bytes()[:levelsBytelen])
}
- if _, p.err =
p.decompress(lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err
!= nil {
+ if _, p.err = p.decompress(p.r,
lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
return false
}
} else {
@@ -658,11 +825,13 @@ func (p *serializedPageReader) Next() bool {
compressed: compressed,
uncompressedSize: int32(lenUncompressed),
statistics: extractStats(dataHeader),
+ firstRowIndex: firstRowIdx,
}
default:
// we don't know this page type, we're allowed to skip
non-data pages
continue
}
+
return true
}
diff --git a/parquet/file/record_reader.go b/parquet/file/record_reader.go
index d8c627d..87c2713 100644
--- a/parquet/file/record_reader.go
+++ b/parquet/file/record_reader.go
@@ -132,12 +132,16 @@ type primitiveRecordReader struct {
func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator,
bufferPool *sync.Pool) primitiveRecordReader {
return primitiveRecordReader{
- ColumnChunkReader: NewColumnReader(descr, nil, mem, bufferPool),
- values: memory.NewResizableBuffer(mem),
- validBits: memory.NewResizableBuffer(mem),
- mem: mem,
- refCount: 1,
- useValues: descr.PhysicalType() !=
parquet.Types.ByteArray && descr.PhysicalType() !=
parquet.Types.FixedLenByteArray,
+ ColumnChunkReader: newTypedColumnChunkReader(columnChunkReader{
+ descr: descr,
+ mem: mem,
+ bufferPool: bufferPool,
+ }),
+ values: memory.NewResizableBuffer(mem),
+ validBits: memory.NewResizableBuffer(mem),
+ mem: mem,
+ refCount: 1,
+ useValues: descr.PhysicalType() != parquet.Types.ByteArray &&
descr.PhysicalType() != parquet.Types.FixedLenByteArray,
}
}
diff --git a/parquet/file/row_group_reader.go b/parquet/file/row_group_reader.go
index f800d19..acfac0e 100644
--- a/parquet/file/row_group_reader.go
+++ b/parquet/file/row_group_reader.go
@@ -22,7 +22,9 @@ import (
"github.com/apache/arrow-go/v18/internal/utils"
"github.com/apache/arrow-go/v18/parquet"
+ "github.com/apache/arrow-go/v18/parquet/internal/encoding"
"github.com/apache/arrow-go/v18/parquet/internal/encryption"
+ format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
"github.com/apache/arrow-go/v18/parquet/metadata"
"golang.org/x/xerrors"
)
@@ -39,7 +41,9 @@ type RowGroupReader struct {
props *parquet.ReaderProperties
fileDecryptor encryption.FileDecryptor
- bufferPool *sync.Pool
+ pageIndexReader *metadata.PageIndexReader
+ rgPageIndexReader *metadata.RowGroupPageIndexReader
+ bufferPool *sync.Pool
}
// MetaData returns the metadata of the current Row Group
@@ -67,7 +71,13 @@ func (r *RowGroupReader) Column(i int) (ColumnChunkReader,
error) {
if err != nil {
return nil, fmt.Errorf("parquet: unable to initialize page
reader: %w", err)
}
- return NewColumnReader(descr, pageRdr, r.props.Allocator(),
r.bufferPool), nil
+ return newTypedColumnChunkReader(columnChunkReader{
+ descr: descr,
+ rdr: pageRdr,
+ mem: r.props.Allocator(),
+ bufferPool: r.bufferPool,
+ decoders: make(map[format.Encoding]encoding.TypedDecoder),
+ }), nil
}
func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) {
@@ -76,6 +86,14 @@ func (r *RowGroupReader) GetColumnPageReader(i int)
(PageReader, error) {
return nil, err
}
+ if r.rgPageIndexReader == nil {
+ rgIdx, err :=
r.pageIndexReader.RowGroup(int(r.rgMetadata.Ordinal()))
+ if err != nil {
+ return nil, err
+ }
+ r.rgPageIndexReader = rgIdx
+ }
+
colStart := col.DataPageOffset()
if col.HasDictionaryPage() && col.DictionaryPageOffset() > 0 &&
colStart > col.DictionaryPageOffset() {
colStart = col.DictionaryPageOffset()
@@ -106,7 +124,16 @@ func (r *RowGroupReader) GetColumnPageReader(i int)
(PageReader, error) {
cryptoMetadata := col.CryptoMetadata()
if cryptoMetadata == nil {
- return NewPageReader(stream, col.NumValues(),
col.Compression(), r.props.Allocator(), nil)
+ pr := &serializedPageReader{
+ r: stream,
+ chunk: col,
+ colIdx: i,
+ pgIndexReader: r.rgPageIndexReader,
+ maxPageHeaderSize: defaultMaxPageHeaderSize,
+ nrows: col.NumValues(),
+ mem: r.props.Allocator(),
+ }
+ return pr, pr.init(col.Compression(), nil)
}
if r.fileDecryptor == nil {
@@ -126,7 +153,17 @@ func (r *RowGroupReader) GetColumnPageReader(i int)
(PageReader, error) {
MetaDecryptor:
r.fileDecryptor.GetFooterDecryptorForColumnMeta(""),
DataDecryptor:
r.fileDecryptor.GetFooterDecryptorForColumnData(""),
}
- return NewPageReader(stream, col.NumValues(),
col.Compression(), r.props.Allocator(), &ctx)
+ pr := &serializedPageReader{
+ r: stream,
+ chunk: col,
+ colIdx: i,
+ pgIndexReader: r.rgPageIndexReader,
+ maxPageHeaderSize: defaultMaxPageHeaderSize,
+ nrows: col.NumValues(),
+ mem: r.props.Allocator(),
+ cryptoCtx: ctx,
+ }
+ return pr, pr.init(col.Compression(), &ctx)
}
// column encrypted with it's own key
@@ -140,5 +177,15 @@ func (r *RowGroupReader) GetColumnPageReader(i int)
(PageReader, error) {
MetaDecryptor:
r.fileDecryptor.GetColumnMetaDecryptor(parquet.ColumnPath(columnPath).String(),
string(columnKeyMeta), ""),
DataDecryptor:
r.fileDecryptor.GetColumnDataDecryptor(parquet.ColumnPath(columnPath).String(),
string(columnKeyMeta), ""),
}
- return NewPageReader(stream, col.NumValues(), col.Compression(),
r.props.Allocator(), &ctx)
+ pr := &serializedPageReader{
+ r: stream,
+ chunk: col,
+ colIdx: i,
+ pgIndexReader: r.rgPageIndexReader,
+ maxPageHeaderSize: defaultMaxPageHeaderSize,
+ nrows: col.NumValues(),
+ mem: r.props.Allocator(),
+ cryptoCtx: ctx,
+ }
+ return pr, pr.init(col.Compression(), &ctx)
}
diff --git a/parquet/internal/testutils/pagebuilder.go
b/parquet/internal/testutils/pagebuilder.go
index eb3abc4..41c2ba6 100644
--- a/parquet/internal/testutils/pagebuilder.go
+++ b/parquet/internal/testutils/pagebuilder.go
@@ -173,7 +173,7 @@ func (d *DictionaryPageBuilder) NumValues() int32 {
return d.numDictValues
}
-func MakeDataPage(dataPageVersion parquet.DataPageVersion, d *schema.Column,
values interface{}, nvals int, e parquet.Encoding, indexBuffer encoding.Buffer,
defLvls, repLvls []int16, maxDef, maxRep int16) file.Page {
+func MakeDataPage(dataPageVersion parquet.DataPageVersion, d *schema.Column,
values interface{}, nvals int, e parquet.Encoding, indexBuffer encoding.Buffer,
defLvls, repLvls []int16, maxDef, maxRep int16, firstRowIdx int64) file.Page {
num := 0
stream := encoding.NewBufferWriter(1024, mem)
@@ -196,9 +196,19 @@ func MakeDataPage(dataPageVersion parquet.DataPageVersion,
d *schema.Column, val
buf := stream.Finish()
if dataPageVersion == parquet.DataPageV1 {
- return file.NewDataPageV1(buf, int32(num), e,
builder.defLvlEncoding, builder.repLvlEncoding, int32(buf.Len()))
+ return file.NewDataPageV1WithConfig(buf,
builder.defLvlEncoding, builder.repLvlEncoding, file.DataPageConfig{
+ Num: int32(num),
+ Encoding: e,
+ UncompressedSize: int32(buf.Len()),
+ FirstRowIndex: firstRowIdx,
+ })
}
- return file.NewDataPageV2(buf, int32(num), 0, int32(num), e,
int32(builder.defLvlBytesLen), int32(builder.repLvlBytesLen), int32(buf.Len()),
false)
+ return file.NewDataPageV2WithConfig(buf, 0, int32(num),
int32(builder.defLvlBytesLen), int32(builder.repLvlBytesLen), false,
file.DataPageConfig{
+ Num: int32(num),
+ Encoding: e,
+ UncompressedSize: int32(buf.Len()),
+ FirstRowIndex: firstRowIdx,
+ })
}
func MakeDictPage(d *schema.Column, values interface{}, valuesPerPage []int, e
parquet.Encoding) (*file.DictionaryPage, []encoding.Buffer) {
@@ -237,6 +247,26 @@ func (m *MockPageReader) Page() file.Page {
return m.TestData().Get("pages").Data().([]file.Page)[m.curpage-1]
}
+func (m *MockPageReader) SeekToPageWithRow(rowIdx int64) error {
+ pg1 := m.TestData().Get("pages").Data().([]file.Page)[0]
+ _, hasDict := pg1.(*file.DictionaryPage)
+ m.curpage = m.TestData().Get("row_map").Data().(func(int64) int)(rowIdx)
+ if hasDict {
+ m.curpage++
+ }
+
+ return nil
+}
+
+func (m *MockPageReader) GetDictionaryPage() (*file.DictionaryPage, error) {
+ pg1 := m.TestData().Get("pages").Data().([]file.Page)[0]
+ if dict, ok := pg1.(*file.DictionaryPage); ok {
+ return dict, nil
+ }
+
+ return nil, nil
+}
+
func (m *MockPageReader) Next() bool {
pageList := m.TestData().Get("pages").Data().([]file.Page)
m.curpage++
@@ -269,7 +299,7 @@ func PaginatePlain(version parquet.DataPageVersion, d
*schema.Column, values ref
page := MakeDataPage(version, d,
values.Slice(valueStart,
valueStart+valuesPerPage[i]).Interface(),
valuesPerPage[i], enc, nil,
defLevels[defLvlStart:defLvlEnd],
- repLevels[repLvlStart:repLvlEnd], maxDef, maxRep)
+ repLevels[repLvlStart:repLvlEnd], maxDef, maxRep,
int64(i*lvlsPerPage))
valueStart += valuesPerPage[i]
pageList = append(pageList, page)
}
@@ -298,7 +328,7 @@ func PaginateDict(version parquet.DataPageVersion, d
*schema.Column, values refl
repEnd = (i + 1) * lvlsPerPage
}
page := MakeDataPage(version, d, nil, valuesPerPage[i], enc,
rleIndices[i],
- defLevels[defStart:defEnd], repLevels[repStart:repEnd],
maxDef, maxRep)
+ defLevels[defStart:defEnd], repLevels[repStart:repEnd],
maxDef, maxRep, int64(i*lvlsPerPage))
pages = append(pages, page)
}
return pages
diff --git a/parquet/metadata/page_index.go b/parquet/metadata/page_index.go
index 673f9ff..497a260 100644
--- a/parquet/metadata/page_index.go
+++ b/parquet/metadata/page_index.go
@@ -20,6 +20,7 @@ import (
"fmt"
"io"
"math"
+ "sync"
"github.com/JohnCGriffin/overflow"
"github.com/apache/arrow-go/v18/arrow"
@@ -293,6 +294,12 @@ type RowGroupPageIndexReader struct {
// buffers to hold raw bytes of page index
// will be lazily set when the corresponding page index is accessed
colIndexBuffer, offsetIndexBuffer []byte
+
+ // cache of column indexes
+ colIndexes map[int]ColumnIndex
+ // cache of offset indices
+ offsetIndices map[int]OffsetIndex
+ mx sync.Mutex
}
func (r *RowGroupPageIndexReader) GetColumnIndex(i int) (ColumnIndex, error) {
@@ -301,6 +308,17 @@ func (r *RowGroupPageIndexReader) GetColumnIndex(i int)
(ColumnIndex, error) {
arrow.ErrInvalid, i)
}
+ r.mx.Lock()
+ defer r.mx.Unlock()
+
+ if r.colIndexes == nil {
+ r.colIndexes = make(map[int]ColumnIndex)
+ } else {
+ if idx, ok := r.colIndexes[i]; ok {
+ return idx, nil
+ }
+ }
+
colChunk, err := r.rowGroupMetadata.ColumnChunk(i)
if err != nil {
return nil, err
@@ -334,7 +352,9 @@ func (r *RowGroupPageIndexReader) GetColumnIndex(i int)
(ColumnIndex, error) {
int16(i), encryption.ColumnIndexModule)
}
- return NewColumnIndex(descr, r.colIndexBuffer[bufferOffset:], r.props,
decryptor), nil
+ idx := NewColumnIndex(descr, r.colIndexBuffer[bufferOffset:], r.props,
decryptor)
+ r.colIndexes[i] = idx
+ return idx, nil
}
func (r *RowGroupPageIndexReader) GetOffsetIndex(i int) (OffsetIndex, error) {
@@ -343,6 +363,17 @@ func (r *RowGroupPageIndexReader) GetOffsetIndex(i int)
(OffsetIndex, error) {
arrow.ErrInvalid, i)
}
+ r.mx.Lock()
+ defer r.mx.Unlock()
+
+ if r.offsetIndices == nil {
+ r.offsetIndices = make(map[int]OffsetIndex)
+ } else {
+ if idx, ok := r.offsetIndices[i]; ok {
+ return idx, nil
+ }
+ }
+
colChunk, err := r.rowGroupMetadata.ColumnChunk(i)
if err != nil {
return nil, err
@@ -375,7 +406,9 @@ func (r *RowGroupPageIndexReader) GetOffsetIndex(i int)
(OffsetIndex, error) {
int16(i), encryption.OffsetIndexModule)
}
- return NewOffsetIndex(r.offsetIndexBuffer[bufferOffset:], r.props,
decryptor), nil
+ oidx := NewOffsetIndex(r.offsetIndexBuffer[bufferOffset:], r.props,
decryptor)
+ r.offsetIndices[i] = oidx
+ return oidx, nil
}
// PageIndexReader is a read-only object for retrieving the Column and Offset
indexes
@@ -412,36 +445,27 @@ func determinePageIndexRangesInRowGroup(rgMeta
*RowGroupMetaData, cols []int32)
var colChunk *ColumnChunkMetaData
if len(cols) == 0 {
+ cols = make([]int32, rgMeta.NumColumns())
for i := 0; i < rgMeta.NumColumns(); i++ {
- if colChunk, err = rgMeta.ColumnChunk(i); err != nil {
- return
- }
-
- if err = mergeRange(colChunk.GetColumnIndexLocation(),
&ciStart, &ciEnd); err != nil {
- return
- }
+ cols[i] = int32(i)
+ }
+ }
- if err = mergeRange(colChunk.GetOffsetIndexLocation(),
&oiStart, &oiEnd); err != nil {
- return
- }
+ for _, i := range cols {
+ if i < 0 || i >= int32(rgMeta.NumColumns()) {
+ return rng, fmt.Errorf("%w: invalid column ordinal %d",
arrow.ErrIndex, i)
}
- } else {
- for _, i := range cols {
- if i < 0 || i >= int32(rgMeta.NumColumns()) {
- return rng, fmt.Errorf("%w: invalid column
ordinal %d", arrow.ErrIndex, i)
- }
- if colChunk, err = rgMeta.ColumnChunk(int(i)); err !=
nil {
- return
- }
+ if colChunk, _ = rgMeta.ColumnChunk(int(i)); colChunk == nil {
+ continue
+ }
- if err = mergeRange(colChunk.GetColumnIndexLocation(),
&ciStart, &ciEnd); err != nil {
- return
- }
+ if err = mergeRange(colChunk.GetColumnIndexLocation(),
&ciStart, &ciEnd); err != nil {
+ return
+ }
- if err = mergeRange(colChunk.GetOffsetIndexLocation(),
&oiStart, &oiEnd); err != nil {
- return
- }
+ if err = mergeRange(colChunk.GetOffsetIndexLocation(),
&oiStart, &oiEnd); err != nil {
+ return
}
}
diff --git a/parquet/reader_properties.go b/parquet/reader_properties.go
index c9b72f9..87d4b7c 100644
--- a/parquet/reader_properties.go
+++ b/parquet/reader_properties.go
@@ -50,6 +50,9 @@ type ReaderProperties struct {
type BufferedReader interface {
Peek(int) ([]byte, error)
Discard(int) (int, error)
+ Outer() utils.Reader
+ BufferSize() int
+ Reset(utils.Reader)
io.Reader
}