This is an automated email from the ASF dual-hosted git repository.
npr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 4da5822 ARROW-11794: [Go] Add concurrent-safe
ipc.FileReader.RecordAt(i)
4da5822 is described below
commit 4da5822cbb04562f9ced5304cfbbe308c950d040
Author: François Saint-Jacques <[email protected]>
AuthorDate: Fri Feb 26 07:50:29 2021 -0800
ARROW-11794: [Go] Add concurrent-safe ipc.FileReader.RecordAt(i)
Arrow IPC files are safe to load concurrently. The implementation of
`ipc.FileReader.Record(i)` is not safe due to stashing the current
record internally. This adds a backward-compatible function `RecordAt`
that behaves like ReadAt.
Closes #9584 from fsaintjacques/go-concurrent-file-reader
Authored-by: François Saint-Jacques <[email protected]>
Signed-off-by: Neal Richardson <[email protected]>
---
go/arrow/internal/arrdata/ioutil.go | 50 +++++++++++++++++++++++++++++++++++++
go/arrow/ipc/file_reader.go | 24 +++++++++++++-----
go/arrow/ipc/file_test.go | 1 +
3 files changed, 69 insertions(+), 6 deletions(-)
diff --git a/go/arrow/internal/arrdata/ioutil.go
b/go/arrow/internal/arrdata/ioutil.go
index 7065f64..33aab24 100644
--- a/go/arrow/internal/arrdata/ioutil.go
+++ b/go/arrow/internal/arrdata/ioutil.go
@@ -17,8 +17,10 @@
package arrdata // import "github.com/apache/arrow/go/arrow/internal/arrdata"
import (
+ "fmt"
"io"
"os"
+ "sync"
"testing"
"github.com/apache/arrow/go/arrow"
@@ -59,6 +61,54 @@ func CheckArrowFile(t *testing.T, f *os.File, mem
memory.Allocator, schema *arro
}
+func CheckArrowConcurrentFile(t *testing.T, f *os.File, mem memory.Allocator,
schema *arrow.Schema, recs []array.Record) {
+ t.Helper()
+
+ _, err := f.Seek(0, io.SeekStart)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ r, err := ipc.NewFileReader(f, ipc.WithSchema(schema),
ipc.WithAllocator(mem))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer r.Close()
+
+ var g sync.WaitGroup
+ errs := make(chan error, r.NumRecords())
+ checkRecord := func(i int) {
+ defer g.Done()
+ rec, err := r.RecordAt(i)
+ if err != nil {
+ errs <- fmt.Errorf("could not read record %d: %v", i,
err)
+ return
+ }
+ if !array.RecordEqual(rec, recs[i]) {
+ errs <- fmt.Errorf("records[%d] differ", i)
+ }
+ }
+
+ for i := 0; i < r.NumRecords(); i++ {
+ g.Add(1)
+ go checkRecord(i)
+ }
+
+ g.Wait()
+ close(errs)
+
+ for err := range errs {
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ err = r.Close()
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
// CheckArrowStream checks whether a given ARROW stream contains the expected
list of records.
func CheckArrowStream(t *testing.T, f *os.File, mem memory.Allocator, schema
*arrow.Schema, recs []array.Record) {
t.Helper()
diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go
index 961803b..cf32448 100644
--- a/go/arrow/ipc/file_reader.go
+++ b/go/arrow/ipc/file_reader.go
@@ -244,6 +244,23 @@ func (f *FileReader) Close() error {
// The returned value is valid until the next call to Record.
// Users need to call Retain on that Record to keep it valid for longer.
func (f *FileReader) Record(i int) (array.Record, error) {
+ record, err := f.RecordAt(i)
+ if err != nil {
+ return nil, err
+ }
+
+ if f.record != nil {
+ f.record.Release()
+ }
+
+ f.record = record
+ return record, nil
+}
+
+// Record returns the i-th record from the file. Ownership is transferred to
the
+// caller and must call Release() to free the memory. This method is safe to
+// call concurrently.
+func (f *FileReader) RecordAt(i int) (array.Record, error) {
if i < 0 || i > f.NumRecords() {
panic("arrow/ipc: record index out of bounds")
}
@@ -271,12 +288,7 @@ func (f *FileReader) Record(i int) (array.Record, error) {
return nil, xerrors.Errorf("arrow/ipc: message %d is not a
Record", i)
}
- if f.record != nil {
- f.record.Release()
- }
-
- f.record = newRecord(f.schema, msg.meta,
bytes.NewReader(msg.body.Bytes()))
- return f.record, nil
+ return newRecord(f.schema, msg.meta,
bytes.NewReader(msg.body.Bytes())), nil
}
// Read reads the current record from the underlying stream and an error, if
any.
diff --git a/go/arrow/ipc/file_test.go b/go/arrow/ipc/file_test.go
index 8c5d515..d0ef960 100644
--- a/go/arrow/ipc/file_test.go
+++ b/go/arrow/ipc/file_test.go
@@ -45,6 +45,7 @@ func TestFile(t *testing.T) {
arrdata.WriteFile(t, f, mem, recs[0].Schema(), recs)
arrdata.CheckArrowFile(t, f, mem, recs[0].Schema(),
recs)
+ arrdata.CheckArrowConcurrentFile(t, f, mem,
recs[0].Schema(), recs)
})
}
}