This is an automated email from the ASF dual-hosted git repository.

npr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4da5822  ARROW-11794: [Go] Add concurrent-safe 
ipc.FileReader.RecordAt(i)
4da5822 is described below

commit 4da5822cbb04562f9ced5304cfbbe308c950d040
Author: François Saint-Jacques <[email protected]>
AuthorDate: Fri Feb 26 07:50:29 2021 -0800

    ARROW-11794: [Go] Add concurrent-safe ipc.FileReader.RecordAt(i)
    
    Arrow IPC files are safe to load concurrently. The implementation of
    `ipc.FileReader.Record(i)` is not safe due to stashing the current
    record internally. This adds a backward-compatible function `RecordAt`
    that behaves like ReadAt.
    
    Closes #9584 from fsaintjacques/go-concurrent-file-reader
    
    Authored-by: François Saint-Jacques <[email protected]>
    Signed-off-by: Neal Richardson <[email protected]>
---
 go/arrow/internal/arrdata/ioutil.go | 50 +++++++++++++++++++++++++++++++++++++
 go/arrow/ipc/file_reader.go         | 24 +++++++++++++-----
 go/arrow/ipc/file_test.go           |  1 +
 3 files changed, 69 insertions(+), 6 deletions(-)

diff --git a/go/arrow/internal/arrdata/ioutil.go 
b/go/arrow/internal/arrdata/ioutil.go
index 7065f64..33aab24 100644
--- a/go/arrow/internal/arrdata/ioutil.go
+++ b/go/arrow/internal/arrdata/ioutil.go
@@ -17,8 +17,10 @@
 package arrdata // import "github.com/apache/arrow/go/arrow/internal/arrdata"
 
 import (
+       "fmt"
        "io"
        "os"
+       "sync"
        "testing"
 
        "github.com/apache/arrow/go/arrow"
@@ -59,6 +61,54 @@ func CheckArrowFile(t *testing.T, f *os.File, mem 
memory.Allocator, schema *arro
 
 }
 
+func CheckArrowConcurrentFile(t *testing.T, f *os.File, mem memory.Allocator, 
schema *arrow.Schema, recs []array.Record) {
+       t.Helper()
+
+       _, err := f.Seek(0, io.SeekStart)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       r, err := ipc.NewFileReader(f, ipc.WithSchema(schema), 
ipc.WithAllocator(mem))
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer r.Close()
+
+       var g sync.WaitGroup
+       errs := make(chan error, r.NumRecords())
+       checkRecord := func(i int) {
+               defer g.Done()
+               rec, err := r.RecordAt(i)
+               if err != nil {
+                       errs <- fmt.Errorf("could not read record %d: %v", i, 
err)
+                       return
+               }
+               if !array.RecordEqual(rec, recs[i]) {
+                       errs <- fmt.Errorf("records[%d] differ", i)
+               }
+       }
+
+       for i := 0; i < r.NumRecords(); i++ {
+               g.Add(1)
+               go checkRecord(i)
+       }
+
+       g.Wait()
+       close(errs)
+
+       for err := range errs {
+               if err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       err = r.Close()
+       if err != nil {
+               t.Fatal(err)
+       }
+}
+
 // CheckArrowStream checks whether a given ARROW stream contains the expected 
list of records.
 func CheckArrowStream(t *testing.T, f *os.File, mem memory.Allocator, schema 
*arrow.Schema, recs []array.Record) {
        t.Helper()
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index 961803b..cf32448 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -244,6 +244,23 @@ func (f *FileReader) Close() error {
 // The returned value is valid until the next call to Record.
 // Users need to call Retain on that Record to keep it valid for longer.
 func (f *FileReader) Record(i int) (array.Record, error) {
+       record, err := f.RecordAt(i)
+       if err != nil {
+               return nil, err
+       }
+
+       if f.record != nil {
+               f.record.Release()
+       }
+
+       f.record = record
+       return record, nil
+}
+
+// Record returns the i-th record from the file. Ownership is transferred to 
the
+// caller and must call Release() to free the memory. This method is safe to
+// call concurrently.
+func (f *FileReader) RecordAt(i int) (array.Record, error) {
        if i < 0 || i > f.NumRecords() {
                panic("arrow/ipc: record index out of bounds")
        }
@@ -271,12 +288,7 @@ func (f *FileReader) Record(i int) (array.Record, error) {
                return nil, xerrors.Errorf("arrow/ipc: message %d is not a 
Record", i)
        }
 
-       if f.record != nil {
-               f.record.Release()
-       }
-
-       f.record = newRecord(f.schema, msg.meta, 
bytes.NewReader(msg.body.Bytes()))
-       return f.record, nil
+       return newRecord(f.schema, msg.meta, 
bytes.NewReader(msg.body.Bytes())), nil
 }
 
 // Read reads the current record from the underlying stream and an error, if 
any.
diff --git a/go/arrow/ipc/file_test.go b/go/arrow/ipc/file_test.go
index 8c5d515..d0ef960 100644
--- a/go/arrow/ipc/file_test.go
+++ b/go/arrow/ipc/file_test.go
@@ -45,6 +45,7 @@ func TestFile(t *testing.T) {
 
                        arrdata.WriteFile(t, f, mem, recs[0].Schema(), recs)
                        arrdata.CheckArrowFile(t, f, mem, recs[0].Schema(), 
recs)
+                       arrdata.CheckArrowConcurrentFile(t, f, mem, 
recs[0].Schema(), recs)
                })
        }
 }

Reply via email to