This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 4970bb8  fix(parquet/file): restore goroutine safety for reader (#343)
4970bb8 is described below

commit 4970bb819bfbd94332eba98fbff4d2eabebc30db
Author: Matt Topol <[email protected]>
AuthorDate: Fri Apr 11 13:37:59 2025 -0400

    fix(parquet/file): restore goroutine safety for reader (#343)
    
    ### Rationale for this change
    Fixes #342
    
    ### What changes are included in this PR?
    use `sync.OnceValues` to initialize the row group page index reader on
    demand, but safely
---
 parquet/file/file_reader.go      |  5 +++++
 parquet/file/row_group_reader.go | 17 +++++++----------
 2 files changed, 12 insertions(+), 10 deletions(-)

diff --git a/parquet/file/file_reader.go b/parquet/file/file_reader.go
index c566ec2..4025939 100644
--- a/parquet/file/file_reader.go
+++ b/parquet/file/file_reader.go
@@ -322,6 +322,11 @@ func (f *Reader) RowGroup(i int) *RowGroupReader {
                fileDecryptor:   f.fileDecryptor,
                bufferPool:      &f.bufferPool,
                pageIndexReader: f.pageIndexReader,
+               // don't pre-emptively initialize the row group page index 
reader
+               // do it on demand, but ensure that it is goroutine safe.
+               rgPageIndexReader: sync.OnceValues(func() 
(*metadata.RowGroupPageIndexReader, error) {
+                       return f.pageIndexReader.RowGroup(i)
+               }),
        }
 }
 
diff --git a/parquet/file/row_group_reader.go b/parquet/file/row_group_reader.go
index acfac0e..ea5f709 100644
--- a/parquet/file/row_group_reader.go
+++ b/parquet/file/row_group_reader.go
@@ -42,7 +42,7 @@ type RowGroupReader struct {
        fileDecryptor encryption.FileDecryptor
 
        pageIndexReader   *metadata.PageIndexReader
-       rgPageIndexReader *metadata.RowGroupPageIndexReader
+       rgPageIndexReader func() (*metadata.RowGroupPageIndexReader, error)
        bufferPool        *sync.Pool
 }
 
@@ -86,12 +86,9 @@ func (r *RowGroupReader) GetColumnPageReader(i int) 
(PageReader, error) {
                return nil, err
        }
 
-       if r.rgPageIndexReader == nil {
-               rgIdx, err := 
r.pageIndexReader.RowGroup(int(r.rgMetadata.Ordinal()))
-               if err != nil {
-                       return nil, err
-               }
-               r.rgPageIndexReader = rgIdx
+       rgIdxRdr, err := r.rgPageIndexReader()
+       if err != nil {
+               return nil, err
        }
 
        colStart := col.DataPageOffset()
@@ -128,7 +125,7 @@ func (r *RowGroupReader) GetColumnPageReader(i int) 
(PageReader, error) {
                        r:                 stream,
                        chunk:             col,
                        colIdx:            i,
-                       pgIndexReader:     r.rgPageIndexReader,
+                       pgIndexReader:     rgIdxRdr,
                        maxPageHeaderSize: defaultMaxPageHeaderSize,
                        nrows:             col.NumValues(),
                        mem:               r.props.Allocator(),
@@ -157,7 +154,7 @@ func (r *RowGroupReader) GetColumnPageReader(i int) 
(PageReader, error) {
                        r:                 stream,
                        chunk:             col,
                        colIdx:            i,
-                       pgIndexReader:     r.rgPageIndexReader,
+                       pgIndexReader:     rgIdxRdr,
                        maxPageHeaderSize: defaultMaxPageHeaderSize,
                        nrows:             col.NumValues(),
                        mem:               r.props.Allocator(),
@@ -181,7 +178,7 @@ func (r *RowGroupReader) GetColumnPageReader(i int) 
(PageReader, error) {
                r:                 stream,
                chunk:             col,
                colIdx:            i,
-               pgIndexReader:     r.rgPageIndexReader,
+               pgIndexReader:     rgIdxRdr,
                maxPageHeaderSize: defaultMaxPageHeaderSize,
                nrows:             col.NumValues(),
                mem:               r.props.Allocator(),

Reply via email to