This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 4970bb8 fix(parquet/file): restore goroutine safety for reader (#343)
4970bb8 is described below
commit 4970bb819bfbd94332eba98fbff4d2eabebc30db
Author: Matt Topol <[email protected]>
AuthorDate: Fri Apr 11 13:37:59 2025 -0400
fix(parquet/file): restore goroutine safety for reader (#343)
### Rationale for this change
Fixes #342
### What changes are included in this PR?
use `sync.OnceValues` to initialize the row group page index reader on
demand, but safely
---
parquet/file/file_reader.go | 5 +++++
parquet/file/row_group_reader.go | 17 +++++++----------
2 files changed, 12 insertions(+), 10 deletions(-)
diff --git a/parquet/file/file_reader.go b/parquet/file/file_reader.go
index c566ec2..4025939 100644
--- a/parquet/file/file_reader.go
+++ b/parquet/file/file_reader.go
@@ -322,6 +322,11 @@ func (f *Reader) RowGroup(i int) *RowGroupReader {
fileDecryptor: f.fileDecryptor,
bufferPool: &f.bufferPool,
pageIndexReader: f.pageIndexReader,
+ // don't pre-emptively initialize the row group page index
reader
+ // do it on demand, but ensure that it is goroutine safe.
+ rgPageIndexReader: sync.OnceValues(func()
(*metadata.RowGroupPageIndexReader, error) {
+ return f.pageIndexReader.RowGroup(i)
+ }),
}
}
diff --git a/parquet/file/row_group_reader.go b/parquet/file/row_group_reader.go
index acfac0e..ea5f709 100644
--- a/parquet/file/row_group_reader.go
+++ b/parquet/file/row_group_reader.go
@@ -42,7 +42,7 @@ type RowGroupReader struct {
fileDecryptor encryption.FileDecryptor
pageIndexReader *metadata.PageIndexReader
- rgPageIndexReader *metadata.RowGroupPageIndexReader
+ rgPageIndexReader func() (*metadata.RowGroupPageIndexReader, error)
bufferPool *sync.Pool
}
@@ -86,12 +86,9 @@ func (r *RowGroupReader) GetColumnPageReader(i int)
(PageReader, error) {
return nil, err
}
- if r.rgPageIndexReader == nil {
- rgIdx, err :=
r.pageIndexReader.RowGroup(int(r.rgMetadata.Ordinal()))
- if err != nil {
- return nil, err
- }
- r.rgPageIndexReader = rgIdx
+ rgIdxRdr, err := r.rgPageIndexReader()
+ if err != nil {
+ return nil, err
}
colStart := col.DataPageOffset()
@@ -128,7 +125,7 @@ func (r *RowGroupReader) GetColumnPageReader(i int)
(PageReader, error) {
r: stream,
chunk: col,
colIdx: i,
- pgIndexReader: r.rgPageIndexReader,
+ pgIndexReader: rgIdxRdr,
maxPageHeaderSize: defaultMaxPageHeaderSize,
nrows: col.NumValues(),
mem: r.props.Allocator(),
@@ -157,7 +154,7 @@ func (r *RowGroupReader) GetColumnPageReader(i int)
(PageReader, error) {
r: stream,
chunk: col,
colIdx: i,
- pgIndexReader: r.rgPageIndexReader,
+ pgIndexReader: rgIdxRdr,
maxPageHeaderSize: defaultMaxPageHeaderSize,
nrows: col.NumValues(),
mem: r.props.Allocator(),
@@ -181,7 +178,7 @@ func (r *RowGroupReader) GetColumnPageReader(i int)
(PageReader, error) {
r: stream,
chunk: col,
colIdx: i,
- pgIndexReader: r.rgPageIndexReader,
+ pgIndexReader: rgIdxRdr,
maxPageHeaderSize: defaultMaxPageHeaderSize,
nrows: col.NumValues(),
mem: r.props.Allocator(),