This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 6dc6926  feat(parquet/file): Add SeekToRow for Column readers (#283)
6dc6926 is described below

commit 6dc6926acfe6e70335511f2df67f57006f8bddec
Author: Matt Topol <[email protected]>
AuthorDate: Thu Feb 20 11:39:10 2025 -0500

    feat(parquet/file): Add SeekToRow for Column readers (#283)
    
    ### Rationale for this change
    Addressing the comments in
    https://github.com/apache/arrow-go/issues/278#issuecomment-2657578697 to
    allow for optimizing reads by skipping entire pages and leveraging the
    offset index if it exists.
    
    ### What changes are included in this PR?
    Deprecating the old `NewColumnChunkReader` and `NewPageReader` methods
    as they really aren't safe to use outside of the package, and have
    proved difficult to evolve without breaking changes. Instead users
    should rely on using the `RowGroupReader` to perform the creation of the
    column readers and page readers, which is generally what is done by
    consumers already.
    
    Adding `SeekToRow` method on the ColumnChunkReader to allow skipping to
    a particular row in the column chunk (which also allows quickly
    resetting back to the beginning of a column!) along with
    `SeekToPageWithRow` method on the page reader. Also updates the `Skip`
    method to properly skip *rows* in a repeated column, not just values.
    
    ### Are these changes tested?
    Yes, tests are included.
    
    ### Are there any user-facing changes?
    Just the new methods. The deprecated methods are not removed currently.
---
 internal/utils/buf_reader.go                 |  26 ++-
 parquet/file/column_reader.go                | 274 +++++++++++++++++++++------
 parquet/file/column_reader_test.go           | 175 ++++++++++++++++-
 parquet/file/column_reader_types.gen.go      |  83 ++------
 parquet/file/column_reader_types.gen.go.tmpl |  16 +-
 parquet/file/file_reader.go                  |  32 ++--
 parquet/file/file_reader_mmap_windows.go     |   2 +-
 parquet/file/page_reader.go                  | 241 +++++++++++++++++++----
 parquet/file/record_reader.go                |  16 +-
 parquet/file/row_group_reader.go             |  57 +++++-
 parquet/internal/testutils/pagebuilder.go    |  40 +++-
 parquet/metadata/page_index.go               |  76 +++++---
 parquet/reader_properties.go                 |   3 +
 13 files changed, 790 insertions(+), 251 deletions(-)

diff --git a/internal/utils/buf_reader.go b/internal/utils/buf_reader.go
index 0b2381d..2a9aa9d 100644
--- a/internal/utils/buf_reader.go
+++ b/internal/utils/buf_reader.go
@@ -23,6 +23,11 @@ import (
        "io"
 )
 
+type Reader interface {
+       io.ReadSeeker
+       io.ReaderAt
+}
+
 // bufferedReader is similar to bufio.Reader except
 // it will expand the buffer if necessary when asked to Peek
 // more bytes than are in the buffer
@@ -30,21 +35,14 @@ type bufferedReader struct {
        bufferSz int
        buf      []byte
        r, w     int
-       rd       io.Reader
+       rd       Reader
        err      error
 }
 
 // NewBufferedReader returns a buffered reader with similar semantics to 
bufio.Reader
 // except Peek will expand the internal buffer if needed rather than return
 // an error.
-func NewBufferedReader(rd io.Reader, sz int) *bufferedReader {
-       // if rd is already a buffered reader whose buffer is >= the requested 
size
-       // then just return it as is. no need to make a new object.
-       b, ok := rd.(*bufferedReader)
-       if ok && len(b.buf) >= sz {
-               return b
-       }
-
+func NewBufferedReader(rd Reader, sz int) *bufferedReader {
        r := &bufferedReader{
                rd: rd,
        }
@@ -52,6 +50,14 @@ func NewBufferedReader(rd io.Reader, sz int) *bufferedReader 
{
        return r
 }
 
+func (b *bufferedReader) Outer() Reader { return b.rd }
+
+func (b *bufferedReader) Reset(rd Reader) {
+       b.resetBuffer()
+       b.rd = rd
+       b.r, b.w = 0, 0
+}
+
 func (b *bufferedReader) resetBuffer() {
        if b.buf == nil {
                b.buf = make([]byte, b.bufferSz)
@@ -97,6 +103,8 @@ func (b *bufferedReader) readErr() error {
        return err
 }
 
+func (b *bufferedReader) BufferSize() int { return b.bufferSz }
+
 // Buffered returns the number of bytes currently buffered
 func (b *bufferedReader) Buffered() int { return b.w - b.r }
 
diff --git a/parquet/file/column_reader.go b/parquet/file/column_reader.go
index 0fb4ed1..5faf8bc 100644
--- a/parquet/file/column_reader.go
+++ b/parquet/file/column_reader.go
@@ -87,6 +87,9 @@ type ColumnChunkReader interface {
        // it encountered. Otherwise this will be nil if it's just the end of 
the
        // column
        Err() error
+
+       SeekToRow(rowIdx int64) error
+
        // Skip buffered values
        consumeBufferedValues(int64)
        // number of available buffered values that have not been decoded yet
@@ -113,7 +116,8 @@ type ColumnChunkReader interface {
 }
 
 type columnChunkReader struct {
-       descr             *schema.Column
+       descr *schema.Column
+
        rdr               PageReader
        repetitionDecoder encoding.LevelDecoder
        definitionDecoder encoding.LevelDecoder
@@ -135,16 +139,50 @@ type columnChunkReader struct {
        // is set when an error is encountered
        err          error
        defLvlBuffer []int16
+       repLvlBuffer []int16
 
        newDictionary bool
 }
 
+func newTypedColumnChunkReader(base columnChunkReader) ColumnChunkReader {
+       switch base.descr.PhysicalType() {
+       case parquet.Types.FixedLenByteArray:
+               base.decoderTraits = &encoding.FixedLenByteArrayDecoderTraits
+               return &FixedLenByteArrayColumnChunkReader{base}
+       case parquet.Types.Float:
+               base.decoderTraits = &encoding.Float32DecoderTraits
+               return &Float32ColumnChunkReader{base}
+       case parquet.Types.Double:
+               base.decoderTraits = &encoding.Float64DecoderTraits
+               return &Float64ColumnChunkReader{base}
+       case parquet.Types.ByteArray:
+               base.decoderTraits = &encoding.ByteArrayDecoderTraits
+               return &ByteArrayColumnChunkReader{base}
+       case parquet.Types.Int32:
+               base.decoderTraits = &encoding.Int32DecoderTraits
+               return &Int32ColumnChunkReader{base}
+       case parquet.Types.Int64:
+               base.decoderTraits = &encoding.Int64DecoderTraits
+               return &Int64ColumnChunkReader{base}
+       case parquet.Types.Int96:
+               base.decoderTraits = &encoding.Int96DecoderTraits
+               return &Int96ColumnChunkReader{base}
+       case parquet.Types.Boolean:
+               base.decoderTraits = &encoding.BooleanDecoderTraits
+               return &BooleanColumnChunkReader{base}
+       }
+       return nil
+}
+
 // NewColumnReader returns a column reader for the provided column initialized 
with the given pagereader that will
 // provide the pages of data for this column. The type is determined from the 
column passed in.
 //
 // In addition to the page reader and allocator, a pointer to a shared 
sync.Pool is expected to provide buffers for temporary
 // usage to minimize allocations. The bufferPool should provide *memory.Buffer 
objects that can be resized as necessary, buffers
 // should have `ResizeNoShrink(0)` called on them before being put back into 
the pool.
+//
+// Deprecated: This function will be removed from the public interface soon as 
it is currently unsafe to use
+// outside of this package.
 func NewColumnReader(descr *schema.Column, pageReader PageReader, mem 
memory.Allocator, bufferPool *sync.Pool) ColumnChunkReader {
        base := columnChunkReader{descr: descr, rdr: pageReader, mem: mem, 
decoders: make(map[format.Encoding]encoding.TypedDecoder), bufferPool: 
bufferPool}
        switch descr.PhysicalType() {
@@ -197,6 +235,15 @@ func (c *columnChunkReader) getDefLvlBuffer(sz int64) 
[]int16 {
        return c.defLvlBuffer[:sz]
 }
 
+func (c *columnChunkReader) getRepLvlBuffer(sz int64) []int16 {
+       if int64(cap(c.repLvlBuffer)) < sz {
+               c.repLvlBuffer = make([]int16, sz)
+               return c.repLvlBuffer
+       }
+
+       return c.repLvlBuffer[:sz]
+}
+
 // HasNext returns whether there is more data to be read in this column
 // and row group.
 func (c *columnChunkReader) HasNext() bool {
@@ -206,6 +253,23 @@ func (c *columnChunkReader) HasNext() bool {
        return true
 }
 
+func (c *columnChunkReader) readDictionary() error {
+       if c.newDictionary {
+               return nil
+       }
+
+       page, err := c.pager().GetDictionaryPage()
+       if err != nil {
+               return err
+       }
+
+       if page != nil {
+               return c.configureDict(page)
+       }
+
+       return nil
+}
+
 func (c *columnChunkReader) configureDict(page *DictionaryPage) error {
        enc := page.encoding
        if enc == format.Encoding_PLAIN_DICTIONARY || enc == 
format.Encoding_PLAIN {
@@ -233,6 +297,30 @@ func (c *columnChunkReader) configureDict(page 
*DictionaryPage) error {
        return nil
 }
 
+func (c *columnChunkReader) processPage() (bool, error) {
+       var (
+               err        error
+               lvlByteLen int64
+       )
+       switch p := c.curPage.(type) {
+       case *DictionaryPage:
+               return false, c.configureDict(p)
+       case *DataPageV1:
+               lvlByteLen, err = c.initLevelDecodersV1(p, p.repLvlEncoding, 
p.defLvlEncoding)
+       case *DataPageV2:
+               lvlByteLen, err = c.initLevelDecodersV2(p)
+       default:
+               // we can skip non-data pages
+               return false, nil
+       }
+
+       if err != nil {
+               return true, err
+       }
+
+       return true, c.initDataDecoder(c.curPage, lvlByteLen)
+}
+
 // read a new page from the page reader
 func (c *columnChunkReader) readNewPage() bool {
        for c.rdr.Next() { // keep going until we get a data page
@@ -241,31 +329,15 @@ func (c *columnChunkReader) readNewPage() bool {
                        break
                }
 
-               var lvlByteLen int64
-               switch p := c.curPage.(type) {
-               case *DictionaryPage:
-                       if err := c.configureDict(p); err != nil {
-                               c.err = err
-                               return false
-                       }
-                       continue
-               case *DataPageV1:
-                       lvlByteLen, c.err = c.initLevelDecodersV1(p, 
p.repLvlEncoding, p.defLvlEncoding)
-                       if c.err != nil {
-                               return false
-                       }
-               case *DataPageV2:
-                       lvlByteLen, c.err = c.initLevelDecodersV2(p)
-                       if c.err != nil {
-                               return false
-                       }
-               default:
-                       // we can skip non-data pages
-                       continue
+               gotDataPage, err := c.processPage()
+               if err != nil {
+                       c.err = err
+                       return false
                }
 
-               c.err = c.initDataDecoder(c.curPage, lvlByteLen)
-               return c.err == nil
+               if gotDataPage {
+                       return true
+               }
        }
        c.err = c.rdr.Err()
        return false
@@ -283,6 +355,9 @@ func (c *columnChunkReader) initLevelDecodersV2(page 
*DataPageV2) (int64, error)
 
        if c.descr.MaxRepetitionLevel() > 0 {
                c.repetitionDecoder.SetDataV2(page.repLvlByteLen, 
c.descr.MaxRepetitionLevel(), int(c.numBuffered), buf)
+               if c.repLvlBuffer != nil {
+                       c.repLvlBuffer = c.repLvlBuffer[:0]
+               }
        }
        // ARROW-17453: Some writers will write repetition levels even when
        // the max repetition level is 0, so we should respect the value
@@ -339,6 +414,11 @@ func (c *columnChunkReader) initDataDecoder(page Page, 
lvlByteLen int64) error {
        encoding := page.Encoding()
 
        if isDictIndexEncoding(encoding) {
+               // if we're seeking or otherwise skipping pages, we may not 
have read
+               // the dictionary page in yet, so let's ensure we got it if one 
exists
+               if err := c.readDictionary(); err != nil {
+                       return err
+               }
                encoding = format.Encoding_RLE_DICTIONARY
        }
 
@@ -396,6 +476,9 @@ func (c *columnChunkReader) readRepetitionLevels(levels 
[]int16) int {
                return 0
        }
 
+       if len(c.repLvlBuffer) > 0 {
+               return copy(levels, c.repLvlBuffer[c.numDecoded:])
+       }
        nlevels, _ := c.repetitionDecoder.Decode(levels)
        return nlevels
 }
@@ -436,49 +519,114 @@ func (c *columnChunkReader) determineNumToRead(batchLen 
int64, defLvls, repLvls
        return
 }
 
-// skipValues some number of rows using readFn as the function to read the 
data and throw it away.
-// If we can skipValues a whole page based on its metadata, then we do so, 
otherwise we read the
-// page until we have skipped the number of rows desired.
-func (c *columnChunkReader) skipValues(nvalues int64, readFn func(batch int64, 
buf []byte) (int64, error)) (int64, error) {
-       var err error
-       toskip := nvalues
-       for c.HasNext() && toskip > 0 {
-               // if number to skip is more than the number of undecoded 
values, skip the page
-               if toskip > (c.numBuffered - c.numDecoded) {
-                       toskip -= c.numBuffered - c.numDecoded
-                       c.numDecoded = c.numBuffered
-               } else {
-                       var (
-                               batchSize int64 = 1024
-                               valsRead  int64 = 0
-                       )
-
-                       scratch := c.bufferPool.Get().(*memory.Buffer)
-                       defer func() {
-                               scratch.ResizeNoShrink(0)
-                               c.bufferPool.Put(scratch)
-                       }()
-                       bufMult := 1
-                       if c.descr.PhysicalType() == parquet.Types.Boolean {
-                               // for bools, BytesRequired returns 1 byte per 
8 bool, but casting []byte to []bool requires 1 byte per 1 bool
-                               bufMult = 8
+// SeekToRow will seek to the row index provided in the column chunk. If
+// the metadata contains an OffsetIndex for skipping pages based on row indexes
+// then the pager will use that to skip to the correct page.
+//
+// If there is no OffsetIndex, then the pager will read each page until it
+// finds the page that contains the desired row index, and the Column Chunk
+// reader will discard values until it reaches the desired row index according
+// to the definition and repetition levels.
+func (c *columnChunkReader) SeekToRow(rowIdx int64) error {
+       if err := c.pager().SeekToPageWithRow(rowIdx); err != nil {
+               return err
+       }
+
+       c.numBuffered, c.numDecoded = 0, 0
+       c.curPage = c.rdr.Page()
+       if c.curPage == nil {
+               c.err = c.rdr.Err()
+               return c.err
+       }
+
+       gotDataPage, err := c.processPage()
+       if err != nil {
+               c.err = err
+               return err
+       }
+
+       if !gotDataPage {
+               c.readNewPage()
+       }
+
+       return c.skipRows(rowIdx - c.curPage.(DataPage).FirstRowIndex())
+}
+
+func (c *columnChunkReader) skipRows(nrows int64) error {
+       toSkip := nrows
+       for c.HasNext() && toSkip > 0 {
+               // if there are no repetition levels, then this is easy! each 
level
+               // is one row so we just use the definition levels to determine
+               // the number of physical values to discard!
+               if c.descr.MaxRepetitionLevel() == 0 {
+                       if toSkip >= (c.numBuffered - c.numDecoded) {
+                               toSkip -= c.numBuffered - c.numDecoded
+                               c.numDecoded = c.numBuffered
+                               continue
+                       }
+
+                       ndefs, nvals, err := c.determineNumToRead(toSkip, nil, 
nil)
+                       if err != nil {
+                               c.err = err
+                               return err
+                       }
+
+                       skipped, err := c.curDecoder.Discard(int(nvals))
+                       if err != nil {
+                               c.err = err
+                               return err
                        }
-                       
scratch.Reserve(c.decoderTraits.BytesRequired(int(batchSize) * bufMult))
-
-                       for {
-                               batchSize = utils.Min(batchSize, toskip)
-                               valsRead, err = readFn(batchSize, scratch.Buf())
-                               toskip -= valsRead
-                               if valsRead <= 0 || toskip <= 0 || err != nil {
-                                       break
+
+                       skipped = max(ndefs, skipped)
+
+                       toSkip -= int64(skipped)
+                       c.consumeBufferedValues(int64(skipped))
+               } else {
+                       // with repetition levels, we have to check them to 
determine
+                       // how many rows to skip. we can't just skip the number 
of values
+                       // because there could be multiple values per row. So 
we read in
+                       // the repetition levels for the entire page at once 
and then go
+                       // through them to find the right row.
+                       repLvls := c.getRepLvlBuffer(c.numBuffered)
+                       nreps, _ := c.repetitionDecoder.Decode(repLvls)
+
+                       rowsSkipped := int64(0)
+                       levelsToSkip := -1
+                       for i, def := range repLvls[:nreps] {
+                               if def == 0 {
+                                       if rowsSkipped == toSkip {
+                                               levelsToSkip = i
+                                               break
+                                       }
+                                       rowsSkipped++
                                }
                        }
+
+                       if levelsToSkip == -1 {
+                               toSkip -= rowsSkipped
+                               c.numBuffered, c.numDecoded = 0, 0
+                               continue
+                       }
+
+                       var valuesToSkip int64
+                       if c.descr.MaxDefinitionLevel() > 0 {
+                               defLvls := 
c.getDefLvlBuffer(int64(levelsToSkip))
+                               _, valuesToSkip = 
c.readDefinitionLevels(defLvls)
+                       } else {
+                               valuesToSkip = int64(levelsToSkip)
+                       }
+
+                       skipped, err := c.curDecoder.Discard(int(valuesToSkip))
+                       if err != nil {
+                               c.err = err
+                               return err
+                       }
+
+                       toSkip -= int64(skipped)
+                       c.consumeBufferedValues(int64(levelsToSkip))
                }
        }
-       if c.err != nil {
-               err = c.err
-       }
-       return nvalues - toskip, err
+       return nil
 }
 
 type readerFunc func(int64, int64) (int, error)
@@ -498,7 +646,7 @@ func (c *columnChunkReader) readBatch(batchSize int64, 
defLvls, repLvls []int16,
                toRead int64
        )
 
-       for c.HasNext() && totalLvls < batchSize && err == nil {
+       for totalLvls < batchSize && c.HasNext() && err == nil {
                if defLvls != nil {
                        defs = defLvls[totalLvls:]
                }
diff --git a/parquet/file/column_reader_test.go 
b/parquet/file/column_reader_test.go
index dd59955..f96156a 100644
--- a/parquet/file/column_reader_test.go
+++ b/parquet/file/column_reader_test.go
@@ -17,6 +17,8 @@
 package file_test
 
 import (
+       "bytes"
+       "fmt"
        "math"
        "math/rand"
        "reflect"
@@ -24,13 +26,17 @@ import (
        "sync"
        "testing"
 
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/arrow/memory"
        "github.com/apache/arrow-go/v18/internal/utils"
        "github.com/apache/arrow-go/v18/parquet"
        "github.com/apache/arrow-go/v18/parquet/file"
        "github.com/apache/arrow-go/v18/parquet/internal/testutils"
+       "github.com/apache/arrow-go/v18/parquet/pqarrow"
        "github.com/apache/arrow-go/v18/parquet/schema"
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
        "github.com/stretchr/testify/suite"
 )
 
@@ -574,7 +580,7 @@ func (p *PrimitiveReaderSuite) TestDictionaryEncodedPages() 
{
 
        p.Run("Dict: Plain, Data: RLEDict", func() {
                dictPage := file.NewDictionaryPage(dummy, 0, 
parquet.Encodings.Plain)
-               dataPage := testutils.MakeDataPage(p.dataPageVersion, descr, 
nil, 0, parquet.Encodings.RLEDict, dummy, nil, nil, 0, 0)
+               dataPage := testutils.MakeDataPage(p.dataPageVersion, descr, 
nil, 0, parquet.Encodings.RLEDict, dummy, nil, nil, 0, 0, 0)
 
                p.pages = append(p.pages, dictPage, dataPage)
                p.initReader(descr)
@@ -585,7 +591,7 @@ func (p *PrimitiveReaderSuite) TestDictionaryEncodedPages() 
{
 
        p.Run("Dict: Plain Dictionary, Data: Plain Dictionary", func() {
                dictPage := file.NewDictionaryPage(dummy, 0, 
parquet.Encodings.PlainDict)
-               dataPage := testutils.MakeDataPage(p.dataPageVersion, descr, 
nil, 0, parquet.Encodings.PlainDict, dummy, nil, nil, 0, 0)
+               dataPage := testutils.MakeDataPage(p.dataPageVersion, descr, 
nil, 0, parquet.Encodings.PlainDict, dummy, nil, nil, 0, 0, 0)
                p.pages = append(p.pages, dictPage, dataPage)
                p.initReader(descr)
                p.NotPanics(func() { p.reader.HasNext() })
@@ -594,7 +600,7 @@ func (p *PrimitiveReaderSuite) TestDictionaryEncodedPages() 
{
        })
 
        p.Run("Panic if dict page not first", func() {
-               dataPage := testutils.MakeDataPage(p.dataPageVersion, descr, 
nil, 0, parquet.Encodings.RLEDict, dummy, nil, nil, 0, 0)
+               dataPage := testutils.MakeDataPage(p.dataPageVersion, descr, 
nil, 0, parquet.Encodings.RLEDict, dummy, nil, nil, 0, 0, 0)
                p.pages = append(p.pages, dataPage)
                p.initReader(descr)
                p.NotPanics(func() { p.False(p.reader.HasNext()) })
@@ -622,7 +628,7 @@ func (p *PrimitiveReaderSuite) TestDictionaryEncodedPages() 
{
        })
 
        p.Run("Unsupported encoding", func() {
-               dataPage := testutils.MakeDataPage(p.dataPageVersion, descr, 
nil, 0, parquet.Encodings.DeltaByteArray, dummy, nil, nil, 0, 0)
+               dataPage := testutils.MakeDataPage(p.dataPageVersion, descr, 
nil, 0, parquet.Encodings.DeltaByteArray, dummy, nil, nil, 0, 0, 0)
                p.pages = append(p.pages, dataPage)
                p.initReader(descr)
                p.Panics(func() { p.reader.HasNext() })
@@ -633,6 +639,82 @@ func (p *PrimitiveReaderSuite) 
TestDictionaryEncodedPages() {
        p.pages = p.pages[:2]
 }
 
+func (p *PrimitiveReaderSuite) TestSeekToRowRequired() {
+       const (
+               levelsPerPage int = 100
+               npages        int = 50
+       )
+
+       for _, enc := range []parquet.Encoding{parquet.Encodings.Plain, 
parquet.Encodings.RLEDict} {
+               p.maxDefLvl, p.maxRepLvl = 0, 0
+               typ := schema.NewInt32Node("a", parquet.Repetitions.Required, 
-1)
+               d := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+
+               p.pages, p.nvalues, p.values, p.defLevels, p.repLevels = 
makePages(p.dataPageVersion,
+                       d, npages, levelsPerPage, reflect.TypeOf(int32(0)), enc)
+               p.nlevels = npages * levelsPerPage
+               p.initReader(d)
+
+               p.pager.(*testutils.MockPageReader).TestData().Set("row_map", 
func(idx int64) int {
+                       return (int(idx) / levelsPerPage) + 1
+               })
+
+               // check seek back to beginning
+               p.checkResults(reflect.TypeOf(int32(0)))
+               p.Require().NoError(p.reader.SeekToRow(0))
+               p.checkResults(reflect.TypeOf(int32(0)))
+
+               p.Require().NoError(p.reader.SeekToRow(550))
+               p.nvalues -= 550
+               p.nlevels -= 550
+               p.values = p.values.Slice(550, p.values.Len())
+               p.checkResults(reflect.TypeOf(int32(0)))
+               p.clear()
+       }
+}
+
+func (p *PrimitiveReaderSuite) TestSeekToRowOptional() {
+       const (
+               levelsPerPage int = 100
+               npages        int = 50
+       )
+
+       for _, enc := range []parquet.Encoding{parquet.Encodings.Plain, 
parquet.Encodings.RLEDict} {
+               p.maxDefLvl, p.maxRepLvl = 4, 0
+               typ := schema.NewInt32Node("a", parquet.Repetitions.Optional, 
-1)
+               d := schema.NewColumn(typ, p.maxDefLvl, p.maxRepLvl)
+
+               p.pages, p.nvalues, p.values, p.defLevels, p.repLevels = 
makePages(p.dataPageVersion,
+                       d, npages, levelsPerPage, reflect.TypeOf(int32(0)), enc)
+               p.nlevels = npages * levelsPerPage
+               p.initReader(d)
+
+               p.pager.(*testutils.MockPageReader).TestData().Set("row_map", 
func(idx int64) int {
+                       return (int(idx) / levelsPerPage) + 1
+               })
+
+               // check seek back to beginning
+               p.checkResults(reflect.TypeOf(int32(0)))
+               p.Require().NoError(p.reader.SeekToRow(0))
+               p.checkResults(reflect.TypeOf(int32(0)))
+
+               p.Require().NoError(p.reader.SeekToRow(550))
+               realValuesSkipped := 0
+               for i := 0; i < 550; i++ {
+                       if p.defLevels[i] == p.maxDefLvl {
+                               realValuesSkipped++
+                       }
+               }
+
+               p.nvalues -= realValuesSkipped
+               p.values = p.values.Slice(realValuesSkipped, p.values.Len())
+               p.nlevels -= 550
+               p.defLevels = p.defLevels[550:]
+               p.checkResults(reflect.TypeOf(int32(0)))
+               p.clear()
+       }
+}
+
 func TestPrimitiveReader(t *testing.T) {
        t.Parallel()
        t.Run("datapage v1", func(t *testing.T) {
@@ -642,3 +724,88 @@ func TestPrimitiveReader(t *testing.T) {
                suite.Run(t, &PrimitiveReaderSuite{dataPageVersion: 
parquet.DataPageV2})
        })
 }
+
+func TestFullSeekRow(t *testing.T) {
+       mem := memory.DefaultAllocator
+
+       for _, dataPageVersion := range 
[]parquet.DataPageVersion{parquet.DataPageV2, parquet.DataPageV1} {
+               t.Run(fmt.Sprintf("DataPageVersion=%v", dataPageVersion+1), 
func(t *testing.T) {
+
+                       props := 
parquet.NewWriterProperties(parquet.WithAllocator(mem),
+                               parquet.WithDataPageVersion(dataPageVersion), 
parquet.WithDataPageSize(1),
+                               parquet.WithPageIndexEnabled(true))
+
+                       sc := arrow.NewSchema([]arrow.Field{
+                               {Name: "c0", Type: arrow.PrimitiveTypes.Int64, 
Nullable: true},
+                               {Name: "c1", Type: arrow.BinaryTypes.String, 
Nullable: true},
+                               {Name: "c2", Type: 
arrow.ListOf(arrow.PrimitiveTypes.Int64), Nullable: true},
+                       }, nil)
+
+                       tbl, err := array.TableFromJSON(mem, sc, []string{`[
+                               {"c0": 1,    "c1": "a",  "c2": [1]},
+                               {"c0": 2,    "c1": "b",  "c2": [1, 2]},
+                               {"c0": 3,    "c1": "c",  "c2": [null]},
+                               {"c0": null, "c1": "d",  "c2": []},
+                               {"c0": 5,    "c1": null, "c2": [3, 3, 3]},
+                               {"c0": 6,    "c1": "f",  "c2": null}
+                       ]`})
+                       require.NoError(t, err)
+                       defer tbl.Release()
+
+                       schema := tbl.Schema()
+                       arrWriterProps := 
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))
+
+                       var buf bytes.Buffer
+                       wr, err := pqarrow.NewFileWriter(schema, &buf, props, 
arrWriterProps)
+                       require.NoError(t, err)
+
+                       require.NoError(t, wr.WriteTable(tbl, tbl.NumRows()))
+                       require.NoError(t, wr.Close())
+
+                       rdr, err := 
file.NewParquetReader(bytes.NewReader(buf.Bytes()),
+                               
file.WithReadProps(parquet.NewReaderProperties(mem)))
+                       require.NoError(t, err)
+                       defer rdr.Close()
+
+                       rgr := rdr.RowGroup(0)
+                       col, err := rgr.Column(0)
+                       require.NoError(t, err)
+
+                       icr := col.(*file.Int64ColumnChunkReader)
+                       require.NoError(t, icr.SeekToRow(3))
+
+                       vals := make([]int64, 5)
+                       defLvls := make([]int16, 5)
+                       repLvls := make([]int16, 5)
+
+                       total, read, err := icr.ReadBatch(5, vals, defLvls, 
repLvls)
+                       require.NoError(t, err)
+
+                       assert.EqualValues(t, 3, total)
+                       assert.EqualValues(t, 2, read)
+
+                       assert.Equal(t, []int64{5, 6}, vals[:read])
+                       assert.Equal(t, []int16{0, 1, 1}, defLvls[:total])
+                       assert.Equal(t, []int16{0, 0, 0}, repLvls[:total])
+
+                       col2, err := rgr.Column(2)
+                       require.NoError(t, err)
+
+                       icr = col2.(*file.Int64ColumnChunkReader)
+                       require.NoError(t, icr.SeekToRow(3))
+
+                       total, read, err = icr.ReadBatch(5, vals, defLvls, 
repLvls)
+                       require.NoError(t, err)
+
+                       // 5 definition levels are read for the last 3 rows
+                       // because of the repetitions
+                       assert.EqualValues(t, 5, total)
+                       // only 3 physical values though
+                       assert.EqualValues(t, 3, read)
+
+                       assert.Equal(t, []int64{3, 3, 3}, vals[:read])
+                       assert.Equal(t, []int16{1, 3, 3, 3, 0}, defLvls[:total])
+                       assert.Equal(t, []int16{0, 0, 1, 1, 0}, repLvls[:total])
+               })
+       }
+}
diff --git a/parquet/file/column_reader_types.gen.go 
b/parquet/file/column_reader_types.gen.go
index 08202e5..a60cf8e 100644
--- a/parquet/file/column_reader_types.gen.go
+++ b/parquet/file/column_reader_types.gen.go
@@ -19,9 +19,6 @@
 package file
 
 import (
-       "unsafe"
-
-       "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/parquet"
        "github.com/apache/arrow-go/v18/parquet/internal/encoding"
 )
@@ -35,14 +32,8 @@ type Int32ColumnChunkReader struct {
 // Skip skips the next nvalues so that the next call to ReadBatch
 // will start reading *after* the skipped values.
 func (cr *Int32ColumnChunkReader) Skip(nvalues int64) (int64, error) {
-       return cr.columnChunkReader.skipValues(nvalues,
-               func(batch int64, buf []byte) (int64, error) {
-                       vals, _, err := cr.ReadBatch(batch,
-                               arrow.Int32Traits.CastFromBytes(buf),
-                               arrow.Int16Traits.CastFromBytes(buf),
-                               arrow.Int16Traits.CastFromBytes(buf))
-                       return vals, err
-               })
+       err := cr.columnChunkReader.skipRows(nvalues)
+       return nvalues, err
 }
 
 // ReadBatch reads batchSize values from the column.
@@ -69,14 +60,8 @@ type Int64ColumnChunkReader struct {
 // Skip skips the next nvalues so that the next call to ReadBatch
 // will start reading *after* the skipped values.
 func (cr *Int64ColumnChunkReader) Skip(nvalues int64) (int64, error) {
-       return cr.columnChunkReader.skipValues(nvalues,
-               func(batch int64, buf []byte) (int64, error) {
-                       vals, _, err := cr.ReadBatch(batch,
-                               arrow.Int64Traits.CastFromBytes(buf),
-                               arrow.Int16Traits.CastFromBytes(buf),
-                               arrow.Int16Traits.CastFromBytes(buf))
-                       return vals, err
-               })
+       err := cr.columnChunkReader.skipRows(nvalues)
+       return nvalues, err
 }
 
 // ReadBatch reads batchSize values from the column.
@@ -103,14 +88,8 @@ type Int96ColumnChunkReader struct {
 // Skip skips the next nvalues so that the next call to ReadBatch
 // will start reading *after* the skipped values.
 func (cr *Int96ColumnChunkReader) Skip(nvalues int64) (int64, error) {
-       return cr.columnChunkReader.skipValues(nvalues,
-               func(batch int64, buf []byte) (int64, error) {
-                       vals, _, err := cr.ReadBatch(batch,
-                               parquet.Int96Traits.CastFromBytes(buf),
-                               arrow.Int16Traits.CastFromBytes(buf),
-                               arrow.Int16Traits.CastFromBytes(buf))
-                       return vals, err
-               })
+       err := cr.columnChunkReader.skipRows(nvalues)
+       return nvalues, err
 }
 
 // ReadBatch reads batchSize values from the column.
@@ -137,14 +116,8 @@ type Float32ColumnChunkReader struct {
 // Skip skips the next nvalues so that the next call to ReadBatch
 // will start reading *after* the skipped values.
 func (cr *Float32ColumnChunkReader) Skip(nvalues int64) (int64, error) {
-       return cr.columnChunkReader.skipValues(nvalues,
-               func(batch int64, buf []byte) (int64, error) {
-                       vals, _, err := cr.ReadBatch(batch,
-                               arrow.Float32Traits.CastFromBytes(buf),
-                               arrow.Int16Traits.CastFromBytes(buf),
-                               arrow.Int16Traits.CastFromBytes(buf))
-                       return vals, err
-               })
+       err := cr.columnChunkReader.skipRows(nvalues)
+       return nvalues, err
 }
 
 // ReadBatch reads batchSize values from the column.
@@ -171,14 +144,8 @@ type Float64ColumnChunkReader struct {
 // Skip skips the next nvalues so that the next call to ReadBatch
 // will start reading *after* the skipped values.
 func (cr *Float64ColumnChunkReader) Skip(nvalues int64) (int64, error) {
-       return cr.columnChunkReader.skipValues(nvalues,
-               func(batch int64, buf []byte) (int64, error) {
-                       vals, _, err := cr.ReadBatch(batch,
-                               arrow.Float64Traits.CastFromBytes(buf),
-                               arrow.Int16Traits.CastFromBytes(buf),
-                               arrow.Int16Traits.CastFromBytes(buf))
-                       return vals, err
-               })
+       err := cr.columnChunkReader.skipRows(nvalues)
+       return nvalues, err
 }
 
 // ReadBatch reads batchSize values from the column.
@@ -205,14 +172,8 @@ type BooleanColumnChunkReader struct {
 // Skip skips the next nvalues so that the next call to ReadBatch
 // will start reading *after* the skipped values.
 func (cr *BooleanColumnChunkReader) Skip(nvalues int64) (int64, error) {
-       return cr.columnChunkReader.skipValues(nvalues,
-               func(batch int64, buf []byte) (int64, error) {
-                       vals, _, err := cr.ReadBatch(batch,
-                               *(*[]bool)(unsafe.Pointer(&buf)),
-                               nil,
-                               nil)
-                       return vals, err
-               })
+       err := cr.columnChunkReader.skipRows(nvalues)
+       return nvalues, err
 }
 
 // ReadBatch reads batchSize values from the column.
@@ -239,14 +200,8 @@ type ByteArrayColumnChunkReader struct {
 // Skip skips the next nvalues so that the next call to ReadBatch
 // will start reading *after* the skipped values.
 func (cr *ByteArrayColumnChunkReader) Skip(nvalues int64) (int64, error) {
-       return cr.columnChunkReader.skipValues(nvalues,
-               func(batch int64, buf []byte) (int64, error) {
-                       vals, _, err := cr.ReadBatch(batch,
-                               parquet.ByteArrayTraits.CastFromBytes(buf),
-                               arrow.Int16Traits.CastFromBytes(buf),
-                               arrow.Int16Traits.CastFromBytes(buf))
-                       return vals, err
-               })
+       err := cr.columnChunkReader.skipRows(nvalues)
+       return nvalues, err
 }
 
 // ReadBatch reads batchSize values from the column.
@@ -273,14 +228,8 @@ type FixedLenByteArrayColumnChunkReader struct {
 // Skip skips the next nvalues so that the next call to ReadBatch
 // will start reading *after* the skipped values.
 func (cr *FixedLenByteArrayColumnChunkReader) Skip(nvalues int64) (int64, 
error) {
-       return cr.columnChunkReader.skipValues(nvalues,
-               func(batch int64, buf []byte) (int64, error) {
-                       vals, _, err := cr.ReadBatch(batch,
-                               
parquet.FixedLenByteArrayTraits.CastFromBytes(buf),
-                               arrow.Int16Traits.CastFromBytes(buf),
-                               arrow.Int16Traits.CastFromBytes(buf))
-                       return vals, err
-               })
+       err := cr.columnChunkReader.skipRows(nvalues)
+       return nvalues, err
 }
 
 // ReadBatch reads batchSize values from the column.
diff --git a/parquet/file/column_reader_types.gen.go.tmpl 
b/parquet/file/column_reader_types.gen.go.tmpl
index 3b9963a..a9d5bc0 100644
--- a/parquet/file/column_reader_types.gen.go.tmpl
+++ b/parquet/file/column_reader_types.gen.go.tmpl
@@ -31,20 +31,8 @@ type {{.Name}}ColumnChunkReader struct {
 // Skip skips the next nvalues so that the next call to ReadBatch
 // will start reading *after* the skipped values.
 func (cr *{{.Name}}ColumnChunkReader) Skip(nvalues int64) (int64, error) {
-  return cr.columnChunkReader.skipValues(nvalues,
-    func(batch int64, buf []byte) (int64, error) {
-      vals, _, err := cr.ReadBatch(batch,
-        {{- if ne .Name "Boolean"}}
-        {{.prefix}}.{{.Name}}Traits.CastFromBytes(buf),
-        arrow.Int16Traits.CastFromBytes(buf),
-        arrow.Int16Traits.CastFromBytes(buf))
-        {{- else}}
-        *(*[]bool)(unsafe.Pointer(&buf)),
-        nil,
-        nil)
-        {{- end}}
-      return vals, err
-    })
+  err := cr.columnChunkReader.skipRows(nvalues)
+  return nvalues, err
 }
 
 // ReadBatch reads batchSize values from the column.
diff --git a/parquet/file/file_reader.go b/parquet/file/file_reader.go
index 7807ac6..ddccd79 100644
--- a/parquet/file/file_reader.go
+++ b/parquet/file/file_reader.go
@@ -120,7 +120,16 @@ func NewParquetReader(r parquet.ReaderAtSeeker, opts 
...ReadOption) (*Reader, er
        }
 
        if f.metadata == nil {
-               return f, f.parseMetaData()
+               if err := f.parseMetaData(); err != nil {
+                       return nil, err
+               }
+       }
+
+       f.pageIndexReader = &metadata.PageIndexReader{
+               Input:        f.r,
+               Props:        f.props,
+               FileMetadata: f.metadata,
+               Decryptor:    f.fileDecryptor,
        }
 
        return f, nil
@@ -305,23 +314,16 @@ func (f *Reader) RowGroup(i int) *RowGroupReader {
        rg := f.metadata.RowGroups[i]
 
        return &RowGroupReader{
-               fileMetadata:  f.metadata,
-               rgMetadata:    metadata.NewRowGroupMetaData(rg, 
f.metadata.Schema, f.WriterVersion(), f.fileDecryptor),
-               props:         f.props,
-               r:             f.r,
-               fileDecryptor: f.fileDecryptor,
-               bufferPool:    &f.bufferPool,
+               fileMetadata:    f.metadata,
+               rgMetadata:      metadata.NewRowGroupMetaData(rg, 
f.metadata.Schema, f.WriterVersion(), f.fileDecryptor),
+               props:           f.props,
+               r:               f.r,
+               fileDecryptor:   f.fileDecryptor,
+               bufferPool:      &f.bufferPool,
+               pageIndexReader: f.pageIndexReader,
        }
 }
 
 func (f *Reader) GetPageIndexReader() *metadata.PageIndexReader {
-       if f.pageIndexReader == nil {
-               f.pageIndexReader = &metadata.PageIndexReader{
-                       Input:        f.r,
-                       Props:        f.props,
-                       FileMetadata: f.metadata,
-                       Decryptor:    f.fileDecryptor,
-               }
-       }
        return f.pageIndexReader
 }
diff --git a/parquet/file/file_reader_mmap_windows.go 
b/parquet/file/file_reader_mmap_windows.go
index 512370f..92e9620 100644
--- a/parquet/file/file_reader_mmap_windows.go
+++ b/parquet/file/file_reader_mmap_windows.go
@@ -25,6 +25,6 @@ import (
        "github.com/apache/arrow-go/v18/parquet"
 )
 
-func mmapOpen(filename string) (parquet.ReaderAtSeeker, error) {
+func mmapOpen(_ string) (parquet.ReaderAtSeeker, error) {
        return nil, errors.New("mmap not implemented on windows")
 }
diff --git a/parquet/file/page_reader.go b/parquet/file/page_reader.go
index 2226b84..d3639dd 100644
--- a/parquet/file/page_reader.go
+++ b/parquet/file/page_reader.go
@@ -18,12 +18,15 @@ package file
 
 import (
        "bytes"
+       "errors"
        "fmt"
        "io"
+       "sort"
        "sync"
 
        "github.com/JohnCGriffin/overflow"
        "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/internal/utils"
        "github.com/apache/arrow-go/v18/parquet"
        "github.com/apache/arrow-go/v18/parquet/compress"
        "github.com/apache/arrow-go/v18/parquet/internal/encryption"
@@ -47,6 +50,10 @@ type PageReader interface {
        Err() error
        // Reset allows reusing a page reader
        Reset(r parquet.BufferedReader, nrows int64, compressType 
compress.Compression, ctx *CryptoContext)
+
+       // Get the dictionary page for this column chunk
+       GetDictionaryPage() (*DictionaryPage, error)
+       SeekToPageWithRow(rowIdx int64) error
 }
 
 type PageType = format.PageType
@@ -342,7 +349,11 @@ func (d *DictionaryPage) Release() {
 func (d *DictionaryPage) IsSorted() bool { return d.sorted }
 
 type serializedPageReader struct {
-       r        parquet.BufferedReader
+       r             parquet.BufferedReader
+       chunk         *metadata.ColumnChunkMetaData
+       colIdx        int
+       pgIndexReader *metadata.RowGroupPageIndexReader
+
        nrows    int64
        rowsSeen int64
        mem      memory.Allocator
@@ -357,11 +368,46 @@ type serializedPageReader struct {
        dataPageAad       string
        dataPageHeaderAad string
 
+       baseOffset, dataOffset, dictOffset int64
+
        decompressBuffer bytes.Buffer
        err              error
 }
 
+func (p *serializedPageReader) init(compressType compress.Compression, ctx 
*CryptoContext) error {
+       if p.mem == nil {
+               p.mem = memory.NewGoAllocator()
+       }
+
+       codec, err := compress.GetCodec(compressType)
+       if err != nil {
+               return err
+       }
+       p.codec = codec
+
+       if p.decompressBuffer.Cap() < defaultPageHeaderSize {
+               p.decompressBuffer.Grow(defaultPageHeaderSize - 
p.decompressBuffer.Cap())
+       }
+
+       if ctx != nil {
+               p.cryptoCtx = *ctx
+               p.initDecryption()
+       }
+
+       p.baseOffset = p.chunk.DataPageOffset()
+       p.dataOffset = p.baseOffset
+       if p.chunk.HasDictionaryPage() && p.chunk.DictionaryPageOffset() > 0 {
+               p.baseOffset = p.chunk.DictionaryPageOffset()
+               p.dictOffset = p.baseOffset
+       }
+
+       return nil
+}
+
 // NewPageReader returns a page reader for the data which can be read from the 
provided reader and compression.
+//
+// Deprecated: This function isn't properly safe for public API use and should 
not be utilized
+// anymore. It will be removed from the public interface soon to prevent usage 
outside of this package.
 func NewPageReader(r parquet.BufferedReader, nrows int64, compressType 
compress.Compression, mem memory.Allocator, ctx *CryptoContext) (PageReader, 
error) {
        if mem == nil {
                mem = memory.NewGoAllocator()
@@ -439,9 +485,9 @@ func (p *serializedPageReader) Page() Page {
        return p.curPage
 }
 
-func (p *serializedPageReader) decompress(lenCompressed int, buf []byte) 
([]byte, error) {
+func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf 
[]byte) ([]byte, error) {
        p.decompressBuffer.Grow(lenCompressed)
-       if _, err := io.CopyN(&p.decompressBuffer, p.r, int64(lenCompressed)); 
err != nil {
+       if _, err := io.CopyN(&p.decompressBuffer, rd, int64(lenCompressed)); 
err != nil {
                return nil, err
        }
 
@@ -482,6 +528,149 @@ func extractStats(dataHeader dataheader) (pageStats 
metadata.EncodedStatistics)
        return
 }
 
+func (p *serializedPageReader) GetDictionaryPage() (*DictionaryPage, error) {
+       if p.dictOffset > 0 {
+               hdr := format.NewPageHeader()
+               rd := utils.NewBufferedReader(
+                       io.NewSectionReader(p.r.Outer(), 
p.dictOffset-p.baseOffset, p.dataOffset-p.baseOffset),
+                       p.r.BufferSize())
+               if err := p.readPageHeader(rd, hdr); err != nil {
+                       return nil, err
+               }
+
+               dictHeader := hdr.GetDictionaryPageHeader()
+               if dictHeader == nil {
+                       return nil, errors.New("parquet: invalid dictionary 
page header")
+               }
+
+               p.cryptoCtx.StartDecryptWithDictionaryPage = true
+               p.decompressBuffer.Reset()
+               if p.cryptoCtx.DataDecryptor != nil {
+                       p.updateDecryption(p.cryptoCtx.DataDecryptor, 
encryption.DictPageModule, p.dataPageAad)
+               }
+
+               lenCompressed := int(hdr.GetCompressedPageSize())
+               lenUncompressed := int(hdr.GetUncompressedPageSize())
+               if lenCompressed < 0 || lenUncompressed < 0 {
+                       return nil, errors.New("parquet: invalid page header")
+               }
+
+               p.cryptoCtx.StartDecryptWithDictionaryPage = false
+               if dictHeader.GetNumValues() < 0 {
+                       return nil, errors.New("parquet: invalid page header 
(negative number of values)")
+               }
+
+               buf := memory.NewResizableBuffer(p.mem)
+               defer buf.Release()
+               buf.ResizeNoShrink(lenUncompressed)
+
+               data, err := p.decompress(rd, lenCompressed, buf.Bytes())
+               if err != nil {
+                       return nil, err
+               }
+               if len(data) != lenUncompressed {
+                       return nil, fmt.Errorf("parquet: metadata said %d bytes 
uncompressed dictionary page, got %d bytes", lenUncompressed, len(data))
+               }
+
+               return &DictionaryPage{
+                       page: page{
+                               buf:      memory.NewBufferBytes(data),
+                               typ:      hdr.Type,
+                               nvals:    dictHeader.GetNumValues(),
+                               encoding: dictHeader.GetEncoding(),
+                       },
+                       sorted: dictHeader.IsSetIsSorted() && 
dictHeader.GetIsSorted(),
+               }, nil
+       }
+
+       return nil, nil
+}
+
+func (p *serializedPageReader) readPageHeader(rd parquet.BufferedReader, hdr 
*format.PageHeader) error {
+       allowedPgSz := defaultPageHeaderSize
+       for {
+               view, err := rd.Peek(allowedPgSz)
+               if err != nil && err != io.EOF {
+                       return err
+               }
+
+               if len(view) == 0 {
+                       return io.EOF
+               }
+
+               extra := 0
+               if p.cryptoCtx.MetaDecryptor != nil {
+                       p.updateDecryption(p.cryptoCtx.MetaDecryptor, 
encryption.DictPageHeaderModule, p.dataPageHeaderAad)
+                       view = p.cryptoCtx.MetaDecryptor.Decrypt(view)
+                       extra = p.cryptoCtx.MetaDecryptor.CiphertextSizeDelta()
+               }
+
+               remaining, err := thrift.DeserializeThrift(hdr, view)
+               if err != nil {
+                       allowedPgSz *= 2
+                       if allowedPgSz > p.maxPageHeaderSize {
+                               return errors.New("parquet: deserializing page 
header failed")
+                       }
+                       continue
+               }
+
+               rd.Discard(len(view) - int(remaining) + extra)
+               break
+       }
+       return nil
+}
+
+func (p *serializedPageReader) SeekToPageWithRow(rowIdx int64) error {
+       if rowIdx < 0 || rowIdx >= p.nrows {
+               return fmt.Errorf("parquet: cannot seek column reader to row 
index %d", rowIdx)
+       }
+
+       var (
+               oidx metadata.OffsetIndex
+               err  error
+       )
+
+       if p.pgIndexReader != nil {
+               oidx, err = p.pgIndexReader.GetOffsetIndex(p.colIdx)
+               if err != nil {
+                       return err
+               }
+       }
+
+       section := p.r.Outer()
+       if oidx == nil {
+               if _, err = section.Seek(p.dataOffset-p.baseOffset, 
io.SeekStart); err != nil {
+                       return err
+               }
+               p.r.Reset(section)
+
+               p.rowsSeen = 0
+               p.pageOrd = 0
+
+               for p.Next() && p.rowsSeen < rowIdx {
+               }
+               return p.err
+       }
+
+       pages := oidx.GetPageLocations()
+       index := sort.Search(len(pages), func(i int) bool {
+               return pages[i].FirstRowIndex > rowIdx
+       }) - 1
+
+       if index < 0 {
+               return fmt.Errorf("parquet: seek out of range")
+       }
+
+       if _, err = section.Seek(pages[index].GetOffset()-p.baseOffset, 
io.SeekStart); err != nil {
+               return err
+       }
+
+       p.r.Reset(section)
+       p.rowsSeen, p.pageOrd = pages[index].FirstRowIndex, int16(index)
+       p.Next()
+       return p.err
+}
+
 func (p *serializedPageReader) Next() bool {
        // Loop here because there may be unhandled page types that we skip 
until
        // finding a page that we do know what to do with
@@ -493,44 +682,19 @@ func (p *serializedPageReader) Next() bool {
        p.err = nil
 
        for p.rowsSeen < p.nrows {
-               allowedPgSz := defaultPageHeaderSize
                p.decompressBuffer.Reset()
-               for {
-                       view, err := p.r.Peek(allowedPgSz)
-                       if err != nil && err != io.EOF {
+               if err := p.readPageHeader(p.r, p.curPageHdr); err != nil {
+                       if err != io.EOF {
                                p.err = err
-                               return false
                        }
 
-                       if len(view) == 0 {
-                               return false
-                       }
-
-                       extra := 0
-                       if p.cryptoCtx.MetaDecryptor != nil {
-                               p.updateDecryption(p.cryptoCtx.MetaDecryptor, 
encryption.DictPageHeaderModule, p.dataPageHeaderAad)
-                               view = p.cryptoCtx.MetaDecryptor.Decrypt(view)
-                               extra = 
p.cryptoCtx.MetaDecryptor.CiphertextSizeDelta()
-                       }
-
-                       remaining, err := 
thrift.DeserializeThrift(p.curPageHdr, view)
-                       if err != nil {
-                               allowedPgSz *= 2
-                               if allowedPgSz > p.maxPageHeaderSize {
-                                       p.err = xerrors.New("parquet: 
deserializing page header failed")
-                                       return false
-                               }
-                               continue
-                       }
-
-                       p.r.Discard(len(view) - int(remaining) + extra)
-                       break
+                       return false
                }
 
                lenCompressed := int(p.curPageHdr.GetCompressedPageSize())
                lenUncompressed := int(p.curPageHdr.GetUncompressedPageSize())
                if lenCompressed < 0 || lenUncompressed < 0 {
-                       p.err = xerrors.New("parquet: invalid page header")
+                       p.err = errors.New("parquet: invalid page header")
                        return false
                }
 
@@ -551,7 +715,7 @@ func (p *serializedPageReader) Next() bool {
                                return false
                        }
 
-                       data, err := p.decompress(lenCompressed, buf.Bytes())
+                       data, err := p.decompress(p.r, lenCompressed, 
buf.Bytes())
                        if err != nil {
                                p.err = err
                                return false
@@ -580,8 +744,9 @@ func (p *serializedPageReader) Next() bool {
                                return false
                        }
 
+                       firstRowIdx := p.rowsSeen
                        p.rowsSeen += int64(dataHeader.GetNumValues())
-                       data, err := p.decompress(lenCompressed, buf.Bytes())
+                       data, err := p.decompress(p.r, lenCompressed, 
buf.Bytes())
                        if err != nil {
                                p.err = err
                                return false
@@ -603,6 +768,7 @@ func (p *serializedPageReader) Next() bool {
                                repLvlEncoding:   
dataHeader.GetRepetitionLevelEncoding(),
                                uncompressedSize: int32(lenUncompressed),
                                statistics:       extractStats(dataHeader),
+                               firstRowIndex:    firstRowIdx,
                        }
                case format.PageType_DATA_PAGE_V2:
                        p.pageOrd++
@@ -619,7 +785,8 @@ func (p *serializedPageReader) Next() bool {
 
                        compressed := dataHeader.GetIsCompressed()
                        // extract stats
-                       p.rowsSeen += int64(dataHeader.GetNumValues())
+                       firstRowIdx := p.rowsSeen
+                       p.rowsSeen += int64(dataHeader.GetNumRows())
                        levelsBytelen, ok := 
overflow.Add(int(dataHeader.GetDefinitionLevelsByteLength()), 
int(dataHeader.GetRepetitionLevelsByteLength()))
                        if !ok {
                                p.err = xerrors.New("parquet: levels size too 
large (corrupt file?)")
@@ -630,7 +797,7 @@ func (p *serializedPageReader) Next() bool {
                                if levelsBytelen > 0 {
                                        io.ReadFull(p.r, 
buf.Bytes()[:levelsBytelen])
                                }
-                               if _, p.err = 
p.decompress(lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err 
!= nil {
+                               if _, p.err = p.decompress(p.r, 
lenCompressed-levelsBytelen, buf.Bytes()[levelsBytelen:]); p.err != nil {
                                        return false
                                }
                        } else {
@@ -658,11 +825,13 @@ func (p *serializedPageReader) Next() bool {
                                compressed:       compressed,
                                uncompressedSize: int32(lenUncompressed),
                                statistics:       extractStats(dataHeader),
+                               firstRowIndex:    firstRowIdx,
                        }
                default:
                        // we don't know this page type, we're allowed to skip 
non-data pages
                        continue
                }
+
                return true
        }
 
diff --git a/parquet/file/record_reader.go b/parquet/file/record_reader.go
index d8c627d..87c2713 100644
--- a/parquet/file/record_reader.go
+++ b/parquet/file/record_reader.go
@@ -132,12 +132,16 @@ type primitiveRecordReader struct {
 
 func createPrimitiveRecordReader(descr *schema.Column, mem memory.Allocator, 
bufferPool *sync.Pool) primitiveRecordReader {
        return primitiveRecordReader{
-               ColumnChunkReader: NewColumnReader(descr, nil, mem, bufferPool),
-               values:            memory.NewResizableBuffer(mem),
-               validBits:         memory.NewResizableBuffer(mem),
-               mem:               mem,
-               refCount:          1,
-               useValues:         descr.PhysicalType() != 
parquet.Types.ByteArray && descr.PhysicalType() != 
parquet.Types.FixedLenByteArray,
+               ColumnChunkReader: newTypedColumnChunkReader(columnChunkReader{
+                       descr:      descr,
+                       mem:        mem,
+                       bufferPool: bufferPool,
+               }),
+               values:    memory.NewResizableBuffer(mem),
+               validBits: memory.NewResizableBuffer(mem),
+               mem:       mem,
+               refCount:  1,
+               useValues: descr.PhysicalType() != parquet.Types.ByteArray && 
descr.PhysicalType() != parquet.Types.FixedLenByteArray,
        }
 }
 
diff --git a/parquet/file/row_group_reader.go b/parquet/file/row_group_reader.go
index f800d19..acfac0e 100644
--- a/parquet/file/row_group_reader.go
+++ b/parquet/file/row_group_reader.go
@@ -22,7 +22,9 @@ import (
 
        "github.com/apache/arrow-go/v18/internal/utils"
        "github.com/apache/arrow-go/v18/parquet"
+       "github.com/apache/arrow-go/v18/parquet/internal/encoding"
        "github.com/apache/arrow-go/v18/parquet/internal/encryption"
+       format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet"
        "github.com/apache/arrow-go/v18/parquet/metadata"
        "golang.org/x/xerrors"
 )
@@ -39,7 +41,9 @@ type RowGroupReader struct {
        props         *parquet.ReaderProperties
        fileDecryptor encryption.FileDecryptor
 
-       bufferPool *sync.Pool
+       pageIndexReader   *metadata.PageIndexReader
+       rgPageIndexReader *metadata.RowGroupPageIndexReader
+       bufferPool        *sync.Pool
 }
 
 // MetaData returns the metadata of the current Row Group
@@ -67,7 +71,13 @@ func (r *RowGroupReader) Column(i int) (ColumnChunkReader, 
error) {
        if err != nil {
                return nil, fmt.Errorf("parquet: unable to initialize page 
reader: %w", err)
        }
-       return NewColumnReader(descr, pageRdr, r.props.Allocator(), 
r.bufferPool), nil
+       return newTypedColumnChunkReader(columnChunkReader{
+               descr:      descr,
+               rdr:        pageRdr,
+               mem:        r.props.Allocator(),
+               bufferPool: r.bufferPool,
+               decoders:   make(map[format.Encoding]encoding.TypedDecoder),
+       }), nil
 }
 
 func (r *RowGroupReader) GetColumnPageReader(i int) (PageReader, error) {
@@ -76,6 +86,14 @@ func (r *RowGroupReader) GetColumnPageReader(i int) 
(PageReader, error) {
                return nil, err
        }
 
+       if r.rgPageIndexReader == nil {
+               rgIdx, err := 
r.pageIndexReader.RowGroup(int(r.rgMetadata.Ordinal()))
+               if err != nil {
+                       return nil, err
+               }
+               r.rgPageIndexReader = rgIdx
+       }
+
        colStart := col.DataPageOffset()
        if col.HasDictionaryPage() && col.DictionaryPageOffset() > 0 && 
colStart > col.DictionaryPageOffset() {
                colStart = col.DictionaryPageOffset()
@@ -106,7 +124,16 @@ func (r *RowGroupReader) GetColumnPageReader(i int) 
(PageReader, error) {
 
        cryptoMetadata := col.CryptoMetadata()
        if cryptoMetadata == nil {
-               return NewPageReader(stream, col.NumValues(), 
col.Compression(), r.props.Allocator(), nil)
+               pr := &serializedPageReader{
+                       r:                 stream,
+                       chunk:             col,
+                       colIdx:            i,
+                       pgIndexReader:     r.rgPageIndexReader,
+                       maxPageHeaderSize: defaultMaxPageHeaderSize,
+                       nrows:             col.NumValues(),
+                       mem:               r.props.Allocator(),
+               }
+               return pr, pr.init(col.Compression(), nil)
        }
 
        if r.fileDecryptor == nil {
@@ -126,7 +153,17 @@ func (r *RowGroupReader) GetColumnPageReader(i int) 
(PageReader, error) {
                        MetaDecryptor:                  
r.fileDecryptor.GetFooterDecryptorForColumnMeta(""),
                        DataDecryptor:                  
r.fileDecryptor.GetFooterDecryptorForColumnData(""),
                }
-               return NewPageReader(stream, col.NumValues(), 
col.Compression(), r.props.Allocator(), &ctx)
+               pr := &serializedPageReader{
+                       r:                 stream,
+                       chunk:             col,
+                       colIdx:            i,
+                       pgIndexReader:     r.rgPageIndexReader,
+                       maxPageHeaderSize: defaultMaxPageHeaderSize,
+                       nrows:             col.NumValues(),
+                       mem:               r.props.Allocator(),
+                       cryptoCtx:         ctx,
+               }
+               return pr, pr.init(col.Compression(), &ctx)
        }
 
        // column encrypted with it's own key
@@ -140,5 +177,15 @@ func (r *RowGroupReader) GetColumnPageReader(i int) 
(PageReader, error) {
                MetaDecryptor:                  
r.fileDecryptor.GetColumnMetaDecryptor(parquet.ColumnPath(columnPath).String(), 
string(columnKeyMeta), ""),
                DataDecryptor:                  
r.fileDecryptor.GetColumnDataDecryptor(parquet.ColumnPath(columnPath).String(), 
string(columnKeyMeta), ""),
        }
-       return NewPageReader(stream, col.NumValues(), col.Compression(), 
r.props.Allocator(), &ctx)
+       pr := &serializedPageReader{
+               r:                 stream,
+               chunk:             col,
+               colIdx:            i,
+               pgIndexReader:     r.rgPageIndexReader,
+               maxPageHeaderSize: defaultMaxPageHeaderSize,
+               nrows:             col.NumValues(),
+               mem:               r.props.Allocator(),
+               cryptoCtx:         ctx,
+       }
+       return pr, pr.init(col.Compression(), &ctx)
 }
diff --git a/parquet/internal/testutils/pagebuilder.go 
b/parquet/internal/testutils/pagebuilder.go
index eb3abc4..41c2ba6 100644
--- a/parquet/internal/testutils/pagebuilder.go
+++ b/parquet/internal/testutils/pagebuilder.go
@@ -173,7 +173,7 @@ func (d *DictionaryPageBuilder) NumValues() int32 {
        return d.numDictValues
 }
 
-func MakeDataPage(dataPageVersion parquet.DataPageVersion, d *schema.Column, 
values interface{}, nvals int, e parquet.Encoding, indexBuffer encoding.Buffer, 
defLvls, repLvls []int16, maxDef, maxRep int16) file.Page {
+func MakeDataPage(dataPageVersion parquet.DataPageVersion, d *schema.Column, 
values interface{}, nvals int, e parquet.Encoding, indexBuffer encoding.Buffer, 
defLvls, repLvls []int16, maxDef, maxRep int16, firstRowIdx int64) file.Page {
        num := 0
 
        stream := encoding.NewBufferWriter(1024, mem)
@@ -196,9 +196,19 @@ func MakeDataPage(dataPageVersion parquet.DataPageVersion, 
d *schema.Column, val
 
        buf := stream.Finish()
        if dataPageVersion == parquet.DataPageV1 {
-               return file.NewDataPageV1(buf, int32(num), e, 
builder.defLvlEncoding, builder.repLvlEncoding, int32(buf.Len()))
+               return file.NewDataPageV1WithConfig(buf, 
builder.defLvlEncoding, builder.repLvlEncoding, file.DataPageConfig{
+                       Num:              int32(num),
+                       Encoding:         e,
+                       UncompressedSize: int32(buf.Len()),
+                       FirstRowIndex:    firstRowIdx,
+               })
        }
-       return file.NewDataPageV2(buf, int32(num), 0, int32(num), e, 
int32(builder.defLvlBytesLen), int32(builder.repLvlBytesLen), int32(buf.Len()), 
false)
+       return file.NewDataPageV2WithConfig(buf, 0, int32(num), 
int32(builder.defLvlBytesLen), int32(builder.repLvlBytesLen), false, 
file.DataPageConfig{
+               Num:              int32(num),
+               Encoding:         e,
+               UncompressedSize: int32(buf.Len()),
+               FirstRowIndex:    firstRowIdx,
+       })
 }
 
 func MakeDictPage(d *schema.Column, values interface{}, valuesPerPage []int, e 
parquet.Encoding) (*file.DictionaryPage, []encoding.Buffer) {
@@ -237,6 +247,26 @@ func (m *MockPageReader) Page() file.Page {
        return m.TestData().Get("pages").Data().([]file.Page)[m.curpage-1]
 }
 
+func (m *MockPageReader) SeekToPageWithRow(rowIdx int64) error {
+       pg1 := m.TestData().Get("pages").Data().([]file.Page)[0]
+       _, hasDict := pg1.(*file.DictionaryPage)
+       m.curpage = m.TestData().Get("row_map").Data().(func(int64) int)(rowIdx)
+       if hasDict {
+               m.curpage++
+       }
+
+       return nil
+}
+
+func (m *MockPageReader) GetDictionaryPage() (*file.DictionaryPage, error) {
+       pg1 := m.TestData().Get("pages").Data().([]file.Page)[0]
+       if dict, ok := pg1.(*file.DictionaryPage); ok {
+               return dict, nil
+       }
+
+       return nil, nil
+}
+
 func (m *MockPageReader) Next() bool {
        pageList := m.TestData().Get("pages").Data().([]file.Page)
        m.curpage++
@@ -269,7 +299,7 @@ func PaginatePlain(version parquet.DataPageVersion, d 
*schema.Column, values ref
                page := MakeDataPage(version, d,
                        values.Slice(valueStart, 
valueStart+valuesPerPage[i]).Interface(),
                        valuesPerPage[i], enc, nil, 
defLevels[defLvlStart:defLvlEnd],
-                       repLevels[repLvlStart:repLvlEnd], maxDef, maxRep)
+                       repLevels[repLvlStart:repLvlEnd], maxDef, maxRep, 
int64(i*lvlsPerPage))
                valueStart += valuesPerPage[i]
                pageList = append(pageList, page)
        }
@@ -298,7 +328,7 @@ func PaginateDict(version parquet.DataPageVersion, d 
*schema.Column, values refl
                        repEnd = (i + 1) * lvlsPerPage
                }
                page := MakeDataPage(version, d, nil, valuesPerPage[i], enc, 
rleIndices[i],
-                       defLevels[defStart:defEnd], repLevels[repStart:repEnd], 
maxDef, maxRep)
+                       defLevels[defStart:defEnd], repLevels[repStart:repEnd], 
maxDef, maxRep, int64(i*lvlsPerPage))
                pages = append(pages, page)
        }
        return pages
diff --git a/parquet/metadata/page_index.go b/parquet/metadata/page_index.go
index 673f9ff..497a260 100644
--- a/parquet/metadata/page_index.go
+++ b/parquet/metadata/page_index.go
@@ -20,6 +20,7 @@ import (
        "fmt"
        "io"
        "math"
+       "sync"
 
        "github.com/JohnCGriffin/overflow"
        "github.com/apache/arrow-go/v18/arrow"
@@ -293,6 +294,12 @@ type RowGroupPageIndexReader struct {
        // buffers to hold raw bytes of page index
        // will be lazily set when the corresponding page index is accessed
        colIndexBuffer, offsetIndexBuffer []byte
+
+       // cache of column indexes
+       colIndexes map[int]ColumnIndex
+       // cache of offset indices
+       offsetIndices map[int]OffsetIndex
+       mx            sync.Mutex
 }
 
 func (r *RowGroupPageIndexReader) GetColumnIndex(i int) (ColumnIndex, error) {
@@ -301,6 +308,17 @@ func (r *RowGroupPageIndexReader) GetColumnIndex(i int) 
(ColumnIndex, error) {
                        arrow.ErrInvalid, i)
        }
 
+       r.mx.Lock()
+       defer r.mx.Unlock()
+
+       if r.colIndexes == nil {
+               r.colIndexes = make(map[int]ColumnIndex)
+       } else {
+               if idx, ok := r.colIndexes[i]; ok {
+                       return idx, nil
+               }
+       }
+
        colChunk, err := r.rowGroupMetadata.ColumnChunk(i)
        if err != nil {
                return nil, err
@@ -334,7 +352,9 @@ func (r *RowGroupPageIndexReader) GetColumnIndex(i int) 
(ColumnIndex, error) {
                        int16(i), encryption.ColumnIndexModule)
        }
 
-       return NewColumnIndex(descr, r.colIndexBuffer[bufferOffset:], r.props, 
decryptor), nil
+       idx := NewColumnIndex(descr, r.colIndexBuffer[bufferOffset:], r.props, 
decryptor)
+       r.colIndexes[i] = idx
+       return idx, nil
 }
 
 func (r *RowGroupPageIndexReader) GetOffsetIndex(i int) (OffsetIndex, error) {
@@ -343,6 +363,17 @@ func (r *RowGroupPageIndexReader) GetOffsetIndex(i int) 
(OffsetIndex, error) {
                        arrow.ErrInvalid, i)
        }
 
+       r.mx.Lock()
+       defer r.mx.Unlock()
+
+       if r.offsetIndices == nil {
+               r.offsetIndices = make(map[int]OffsetIndex)
+       } else {
+               if idx, ok := r.offsetIndices[i]; ok {
+                       return idx, nil
+               }
+       }
+
        colChunk, err := r.rowGroupMetadata.ColumnChunk(i)
        if err != nil {
                return nil, err
@@ -375,7 +406,9 @@ func (r *RowGroupPageIndexReader) GetOffsetIndex(i int) 
(OffsetIndex, error) {
                        int16(i), encryption.OffsetIndexModule)
        }
 
-       return NewOffsetIndex(r.offsetIndexBuffer[bufferOffset:], r.props, 
decryptor), nil
+       oidx := NewOffsetIndex(r.offsetIndexBuffer[bufferOffset:], r.props, 
decryptor)
+       r.offsetIndices[i] = oidx
+       return oidx, nil
 }
 
 // PageIndexReader is a read-only object for retrieving the Column and Offset 
indexes
@@ -412,36 +445,27 @@ func determinePageIndexRangesInRowGroup(rgMeta 
*RowGroupMetaData, cols []int32)
 
        var colChunk *ColumnChunkMetaData
        if len(cols) == 0 {
+               cols = make([]int32, rgMeta.NumColumns())
                for i := 0; i < rgMeta.NumColumns(); i++ {
-                       if colChunk, err = rgMeta.ColumnChunk(i); err != nil {
-                               return
-                       }
-
-                       if err = mergeRange(colChunk.GetColumnIndexLocation(), 
&ciStart, &ciEnd); err != nil {
-                               return
-                       }
+                       cols[i] = int32(i)
+               }
+       }
 
-                       if err = mergeRange(colChunk.GetOffsetIndexLocation(), 
&oiStart, &oiEnd); err != nil {
-                               return
-                       }
+       for _, i := range cols {
+               if i < 0 || i >= int32(rgMeta.NumColumns()) {
+                       return rng, fmt.Errorf("%w: invalid column ordinal %d", 
arrow.ErrIndex, i)
                }
-       } else {
-               for _, i := range cols {
-                       if i < 0 || i >= int32(rgMeta.NumColumns()) {
-                               return rng, fmt.Errorf("%w: invalid column 
ordinal %d", arrow.ErrIndex, i)
-                       }
 
-                       if colChunk, err = rgMeta.ColumnChunk(int(i)); err != 
nil {
-                               return
-                       }
+               if colChunk, _ = rgMeta.ColumnChunk(int(i)); colChunk == nil {
+                       continue
+               }
 
-                       if err = mergeRange(colChunk.GetColumnIndexLocation(), 
&ciStart, &ciEnd); err != nil {
-                               return
-                       }
+               if err = mergeRange(colChunk.GetColumnIndexLocation(), 
&ciStart, &ciEnd); err != nil {
+                       return
+               }
 
-                       if err = mergeRange(colChunk.GetOffsetIndexLocation(), 
&oiStart, &oiEnd); err != nil {
-                               return
-                       }
+               if err = mergeRange(colChunk.GetOffsetIndexLocation(), 
&oiStart, &oiEnd); err != nil {
+                       return
                }
        }
 
diff --git a/parquet/reader_properties.go b/parquet/reader_properties.go
index c9b72f9..87d4b7c 100644
--- a/parquet/reader_properties.go
+++ b/parquet/reader_properties.go
@@ -50,6 +50,9 @@ type ReaderProperties struct {
 type BufferedReader interface {
        Peek(int) ([]byte, error)
        Discard(int) (int, error)
+       Outer() utils.Reader
+       BufferSize() int
+       Reset(utils.Reader)
        io.Reader
 }
 

Reply via email to