This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new 3e8e919 feat(parquet/pqarrow): parallelize SeekToRow (#380)
3e8e919 is described below
commit 3e8e919cd4efdd22c413034502e57cbed5c13ac7
Author: Matt Topol <[email protected]>
AuthorDate: Thu May 22 11:36:58 2025 -0400
feat(parquet/pqarrow): parallelize SeekToRow (#380)
### Rationale for this change
Closes #379
### What changes are included in this PR?
Update the `SeekToRow` method of the record reader to parallelize the
calls to `SeekToRow` for the columns if the Parallel option is set to
true.
### Are these changes tested?
Yes, the RecordReader SeekToRow is already tested via unit tests with
multiple columns.
### Are there any user-facing changes?
There should only be the performance benefit
---
parquet/pqarrow/file_reader.go | 17 ++++++--
parquet/pqarrow/file_reader_test.go | 86 +++++++++++++++++++------------------
2 files changed, 58 insertions(+), 45 deletions(-)
diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go
index df73613..b064107 100644
--- a/parquet/pqarrow/file_reader.go
+++ b/parquet/pqarrow/file_reader.go
@@ -697,13 +697,22 @@ func (r *recordReader) SeekToRow(row int64) error {
return fmt.Errorf("invalid row index %d, file only has %d
rows", row, r.numRows)
}
+ var (
+ np = 1
+ g errgroup.Group
+ )
+
+ if r.parallel {
+ np = len(r.fieldReaders)
+ }
+
+ g.SetLimit(np)
for _, fr := range r.fieldReaders {
- if err := fr.SeekToRow(row); err != nil {
- return err
- }
+ fr := fr
+ g.Go(func() error { return fr.SeekToRow(row) })
}
- return nil
+ return g.Wait()
}
func (r *recordReader) Retain() {
diff --git a/parquet/pqarrow/file_reader_test.go
b/parquet/pqarrow/file_reader_test.go
index bca5164..1745f0a 100644
--- a/parquet/pqarrow/file_reader_test.go
+++ b/parquet/pqarrow/file_reader_test.go
@@ -295,49 +295,53 @@ func TestRecordReaderSeekToRow(t *testing.T) {
var buf bytes.Buffer
require.NoError(t, pqarrow.WriteTable(tbl, &buf, tbl.NumRows(), nil,
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))))
- pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()),
file.WithReadProps(parquet.NewReaderProperties(mem)))
- require.NoError(t, err)
-
- reader, err := pqarrow.NewFileReader(pf,
pqarrow.ArrowReadProperties{BatchSize: 2}, mem)
- require.NoError(t, err)
-
- sc, err := reader.Schema()
- assert.NoError(t, err)
- assert.Truef(t, tbl.Schema().Equal(sc), "expected: %s\ngot: %s",
tbl.Schema(), sc)
-
- rr, err := reader.GetRecordReader(context.Background(), nil, nil)
- assert.NoError(t, err)
- assert.NotNil(t, rr)
- defer rr.Release()
-
- tr := array.NewTableReader(tbl, 2)
- defer tr.Release()
-
- rec, err := rr.Read()
- assert.NoError(t, err)
- tr.Next()
- assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected:
%s\ngot: %s", tr.Record(), rec)
-
- require.NoError(t, rr.SeekToRow(0))
- rec, err = rr.Read()
- assert.NoError(t, err)
- assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected:
%s\ngot: %s", tr.Record(), rec)
-
- rec, err = rr.Read()
- assert.NoError(t, err)
- tr.Next()
- assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected:
%s\ngot: %s", tr.Record(), rec)
+ for _, parallel := range []bool{false, true} {
+ t.Run(fmt.Sprintf("parallel=%v", parallel), func(t *testing.T) {
+ pf, err :=
file.NewParquetReader(bytes.NewReader(buf.Bytes()),
file.WithReadProps(parquet.NewReaderProperties(mem)))
+ require.NoError(t, err)
- require.NoError(t, rr.SeekToRow(2))
- rec, err = rr.Read()
- assert.NoError(t, err)
- assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected:
%s\ngot: %s", tr.Record(), rec)
+ reader, err := pqarrow.NewFileReader(pf,
pqarrow.ArrowReadProperties{BatchSize: 2, Parallel: parallel}, mem)
+ require.NoError(t, err)
- require.NoError(t, rr.SeekToRow(4))
- rec, err = rr.Read()
- tr.Next()
- assert.NoError(t, err)
- assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected:
%s\ngot: %s", tr.Record(), rec)
+ sc, err := reader.Schema()
+ assert.NoError(t, err)
+ assert.Truef(t, tbl.Schema().Equal(sc), "expected:
%s\ngot: %s", tbl.Schema(), sc)
+
+ rr, err := reader.GetRecordReader(context.Background(),
nil, nil)
+ assert.NoError(t, err)
+ assert.NotNil(t, rr)
+ defer rr.Release()
+
+ tr := array.NewTableReader(tbl, 2)
+ defer tr.Release()
+
+ rec, err := rr.Read()
+ assert.NoError(t, err)
+ tr.Next()
+ assert.Truef(t, array.RecordEqual(tr.Record(), rec),
"expected: %s\ngot: %s", tr.Record(), rec)
+
+ require.NoError(t, rr.SeekToRow(0))
+ rec, err = rr.Read()
+ assert.NoError(t, err)
+ assert.Truef(t, array.RecordEqual(tr.Record(), rec),
"expected: %s\ngot: %s", tr.Record(), rec)
+
+ rec, err = rr.Read()
+ assert.NoError(t, err)
+ tr.Next()
+ assert.Truef(t, array.RecordEqual(tr.Record(), rec),
"expected: %s\ngot: %s", tr.Record(), rec)
+
+ require.NoError(t, rr.SeekToRow(2))
+ rec, err = rr.Read()
+ assert.NoError(t, err)
+ assert.Truef(t, array.RecordEqual(tr.Record(), rec),
"expected: %s\ngot: %s", tr.Record(), rec)
+
+ require.NoError(t, rr.SeekToRow(4))
+ rec, err = rr.Read()
+ tr.Next()
+ assert.NoError(t, err)
+ assert.Truef(t, array.RecordEqual(tr.Record(), rec),
"expected: %s\ngot: %s", tr.Record(), rec)
+ })
+ }
}
func TestRecordReaderMultiRowGroup(t *testing.T) {