This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e8e919  feat(parquet/pqarrow): parallelize SeekToRow (#380)
3e8e919 is described below

commit 3e8e919cd4efdd22c413034502e57cbed5c13ac7
Author: Matt Topol <[email protected]>
AuthorDate: Thu May 22 11:36:58 2025 -0400

    feat(parquet/pqarrow): parallelize SeekToRow (#380)
    
    ### Rationale for this change
    Closes #379
    
    ### What changes are included in this PR?
    Update the `SeekToRow` method of the record reader to parallelize the
    calls to `SeekToRow` for the columns if the Parallel option is set to
    true.
    
    ### Are these changes tested?
    Yes, the RecordReader SeekToRow is already tested via unit tests with
    multiple columns.
    
    ### Are there any user-facing changes?
    There should only be the performance benefit
---
 parquet/pqarrow/file_reader.go      | 17 ++++++--
 parquet/pqarrow/file_reader_test.go | 86 +++++++++++++++++++------------------
 2 files changed, 58 insertions(+), 45 deletions(-)

diff --git a/parquet/pqarrow/file_reader.go b/parquet/pqarrow/file_reader.go
index df73613..b064107 100644
--- a/parquet/pqarrow/file_reader.go
+++ b/parquet/pqarrow/file_reader.go
@@ -697,13 +697,22 @@ func (r *recordReader) SeekToRow(row int64) error {
                return fmt.Errorf("invalid row index %d, file only has %d 
rows", row, r.numRows)
        }
 
+       var (
+               np = 1
+               g  errgroup.Group
+       )
+
+       if r.parallel {
+               np = len(r.fieldReaders)
+       }
+
+       g.SetLimit(np)
        for _, fr := range r.fieldReaders {
-               if err := fr.SeekToRow(row); err != nil {
-                       return err
-               }
+               fr := fr
+               g.Go(func() error { return fr.SeekToRow(row) })
        }
 
-       return nil
+       return g.Wait()
 }
 
 func (r *recordReader) Retain() {
diff --git a/parquet/pqarrow/file_reader_test.go 
b/parquet/pqarrow/file_reader_test.go
index bca5164..1745f0a 100644
--- a/parquet/pqarrow/file_reader_test.go
+++ b/parquet/pqarrow/file_reader_test.go
@@ -295,49 +295,53 @@ func TestRecordReaderSeekToRow(t *testing.T) {
        var buf bytes.Buffer
        require.NoError(t, pqarrow.WriteTable(tbl, &buf, tbl.NumRows(), nil, 
pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))))
 
-       pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()), 
file.WithReadProps(parquet.NewReaderProperties(mem)))
-       require.NoError(t, err)
-
-       reader, err := pqarrow.NewFileReader(pf, 
pqarrow.ArrowReadProperties{BatchSize: 2}, mem)
-       require.NoError(t, err)
-
-       sc, err := reader.Schema()
-       assert.NoError(t, err)
-       assert.Truef(t, tbl.Schema().Equal(sc), "expected: %s\ngot: %s", 
tbl.Schema(), sc)
-
-       rr, err := reader.GetRecordReader(context.Background(), nil, nil)
-       assert.NoError(t, err)
-       assert.NotNil(t, rr)
-       defer rr.Release()
-
-       tr := array.NewTableReader(tbl, 2)
-       defer tr.Release()
-
-       rec, err := rr.Read()
-       assert.NoError(t, err)
-       tr.Next()
-       assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected: 
%s\ngot: %s", tr.Record(), rec)
-
-       require.NoError(t, rr.SeekToRow(0))
-       rec, err = rr.Read()
-       assert.NoError(t, err)
-       assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected: 
%s\ngot: %s", tr.Record(), rec)
-
-       rec, err = rr.Read()
-       assert.NoError(t, err)
-       tr.Next()
-       assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected: 
%s\ngot: %s", tr.Record(), rec)
+       for _, parallel := range []bool{false, true} {
+               t.Run(fmt.Sprintf("parallel=%v", parallel), func(t *testing.T) {
+                       pf, err := 
file.NewParquetReader(bytes.NewReader(buf.Bytes()), 
file.WithReadProps(parquet.NewReaderProperties(mem)))
+                       require.NoError(t, err)
 
-       require.NoError(t, rr.SeekToRow(2))
-       rec, err = rr.Read()
-       assert.NoError(t, err)
-       assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected: 
%s\ngot: %s", tr.Record(), rec)
+                       reader, err := pqarrow.NewFileReader(pf, 
pqarrow.ArrowReadProperties{BatchSize: 2, Parallel: parallel}, mem)
+                       require.NoError(t, err)
 
-       require.NoError(t, rr.SeekToRow(4))
-       rec, err = rr.Read()
-       tr.Next()
-       assert.NoError(t, err)
-       assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected: 
%s\ngot: %s", tr.Record(), rec)
+                       sc, err := reader.Schema()
+                       assert.NoError(t, err)
+                       assert.Truef(t, tbl.Schema().Equal(sc), "expected: 
%s\ngot: %s", tbl.Schema(), sc)
+
+                       rr, err := reader.GetRecordReader(context.Background(), 
nil, nil)
+                       assert.NoError(t, err)
+                       assert.NotNil(t, rr)
+                       defer rr.Release()
+
+                       tr := array.NewTableReader(tbl, 2)
+                       defer tr.Release()
+
+                       rec, err := rr.Read()
+                       assert.NoError(t, err)
+                       tr.Next()
+                       assert.Truef(t, array.RecordEqual(tr.Record(), rec), 
"expected: %s\ngot: %s", tr.Record(), rec)
+
+                       require.NoError(t, rr.SeekToRow(0))
+                       rec, err = rr.Read()
+                       assert.NoError(t, err)
+                       assert.Truef(t, array.RecordEqual(tr.Record(), rec), 
"expected: %s\ngot: %s", tr.Record(), rec)
+
+                       rec, err = rr.Read()
+                       assert.NoError(t, err)
+                       tr.Next()
+                       assert.Truef(t, array.RecordEqual(tr.Record(), rec), 
"expected: %s\ngot: %s", tr.Record(), rec)
+
+                       require.NoError(t, rr.SeekToRow(2))
+                       rec, err = rr.Read()
+                       assert.NoError(t, err)
+                       assert.Truef(t, array.RecordEqual(tr.Record(), rec), 
"expected: %s\ngot: %s", tr.Record(), rec)
+
+                       require.NoError(t, rr.SeekToRow(4))
+                       rec, err = rr.Read()
+                       tr.Next()
+                       assert.NoError(t, err)
+                       assert.Truef(t, array.RecordEqual(tr.Record(), rec), 
"expected: %s\ngot: %s", tr.Record(), rec)
+               })
+       }
 }
 
 func TestRecordReaderMultiRowGroup(t *testing.T) {

Reply via email to