This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git
The following commit(s) were added to refs/heads/main by this push:
new a7d23a7 fix(arrow/array): Fix RecordFromJSON perf (#449)
a7d23a7 is described below
commit a7d23a73d268f4a0a77376096312ec5621ac1ddb
Author: Matt Topol <[email protected]>
AuthorDate: Fri Jul 25 11:19:57 2025 -0400
fix(arrow/array): Fix RecordFromJSON perf (#449)
fixes #448
### Rationale for this change
When dealing with unicode in json values, `RecordFromJSON` seems to have
a significant performance slow-down due to an odd interaction of
decoders with goccy/go-json. `NewJSONReader` doesn't exhibit the issue
because it essentially creates a NewDecoder for each line/record by
decoding into a RecordBuilder directly.
### What changes are included in this PR?
Change `RecordFromJSON` to work closer to `NewJSONReader` in how it
decodes directly into a `RecordBuilder` so that we side-step the
performance problem for large amounts of JSON.
### Are these changes tested?
Yes, benchmarks are added to keep track of the performance of using
`RecordFromJSON` vs `NewJSONReader` for the same data.
### Are there any user-facing changes?
Only a performance improvement when JSON has large amounts of unicode
data.
---
arrow/array/json_reader_test.go | 122 ++++++++++++++++++++++++++++++++++++++++
arrow/array/struct.go | 4 +-
arrow/array/util.go | 68 ++++++++++++++++++++--
3 files changed, 187 insertions(+), 7 deletions(-)
diff --git a/arrow/array/json_reader_test.go b/arrow/array/json_reader_test.go
index c53e5dd..d73120b 100644
--- a/arrow/array/json_reader_test.go
+++ b/arrow/array/json_reader_test.go
@@ -17,12 +17,15 @@
package array_test
import (
+ "bytes"
+ "fmt"
"strings"
"testing"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/internal/json"
"github.com/stretchr/testify/assert"
)
@@ -139,3 +142,122 @@ func TestUnmarshalJSON(t *testing.T) {
assert.NotNil(t, record)
}
+
+func generateJSONData(n int) []byte {
+ records := make([]map[string]any, n)
+ for i := range n {
+ records[i] = map[string]any{
+ "id": i,
+ "name": fmt.Sprintf("record_%d", i),
+ "value": float64(i) * 1.5,
+ "active": i%2 == 0,
+ "metadata": fmt.Sprintf("metadata_%d_%s", i,
make([]byte, 500)),
+ }
+ }
+
+ data, _ := json.Marshal(records)
+ return data
+}
+
+func jsonArrayToNDJSON(data []byte) ([]byte, error) {
+ var records []json.RawMessage
+ if err := json.Unmarshal(data, &records); err != nil {
+ return nil, err
+ }
+
+ var ndjson bytes.Buffer
+ for _, record := range records {
+ ndjson.Write(record)
+ ndjson.WriteString("\n")
+ }
+
+ return ndjson.Bytes(), nil
+}
+
+func BenchmarkRecordFromJSON(b *testing.B) {
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64},
+ {Name: "name", Type: arrow.BinaryTypes.String},
+ {Name: "value", Type: arrow.PrimitiveTypes.Float64},
+ {Name: "active", Type: arrow.FixedWidthTypes.Boolean},
+ {Name: "metadata", Type: arrow.BinaryTypes.String},
+ }, nil)
+
+ testSizes := []int64{1000, 5000, 10000}
+
+ for _, size := range testSizes {
+ b.Run(fmt.Sprintf("Size_%d", size), func(b *testing.B) {
+ data := generateJSONData(int(size))
+ pool := memory.NewGoAllocator()
+
+ var rdr bytes.Reader
+ b.SetBytes(int64(len(data)))
+ b.ResetTimer()
+ for range b.N {
+ rdr.Reset(data)
+
+ record, _, err := array.RecordFromJSON(pool,
schema, &rdr)
+ if err != nil {
+ b.Error(err)
+ }
+
+ if record.NumRows() != size {
+ b.Errorf("expected %d rows, got %d",
size, record.NumRows())
+ }
+ record.Release()
+ }
+ })
+ }
+}
+
+func BenchmarkJSONReader(b *testing.B) {
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int64},
+ {Name: "name", Type: arrow.BinaryTypes.String},
+ {Name: "value", Type: arrow.PrimitiveTypes.Float64},
+ {Name: "active", Type: arrow.FixedWidthTypes.Boolean},
+ {Name: "metadata", Type: arrow.BinaryTypes.String},
+ }, nil)
+
+ testSizes := []int64{1000, 5000, 10000}
+
+ for _, size := range testSizes {
+ b.Run(fmt.Sprintf("Size_%d", size), func(b *testing.B) {
+ data := generateJSONData(int(size))
+ data, err := jsonArrayToNDJSON(data)
+ if err != nil {
+ b.Fatalf("failed to convert JSON to NDJSON:
%v", err)
+ }
+
+ var rdr bytes.Reader
+ for _, chkSize := range []int{-1, int(size / 2),
int(size)} {
+ b.Run(fmt.Sprintf("ChunkSize_%d", chkSize),
func(b *testing.B) {
+ pool := memory.NewGoAllocator()
+ b.SetBytes(int64(len(data)))
+ b.ResetTimer()
+ for range b.N {
+ rdr.Reset(data)
+
+ jsonRdr :=
array.NewJSONReader(&rdr, schema, array.WithAllocator(pool),
+
array.WithChunk(chkSize))
+
+ var totalRows int64
+ for jsonRdr.Next() {
+ rec := jsonRdr.Record()
+ totalRows +=
rec.NumRows()
+ }
+
+ if err := jsonRdr.Err(); err !=
nil {
+ b.Errorf("error reading
JSON: %v", err)
+ }
+ jsonRdr.Release()
+
+ if totalRows != size {
+ b.Errorf("expected %d
rows, got %d", size, totalRows)
+ }
+ }
+ })
+ }
+ })
+ }
+}
diff --git a/arrow/array/struct.go b/arrow/array/struct.go
index e0902ee..6883712 100644
--- a/arrow/array/struct.go
+++ b/arrow/array/struct.go
@@ -473,7 +473,9 @@ func (b *StructBuilder) UnmarshalOne(dec *json.Decoder)
error {
idx, ok := b.dtype.(*arrow.StructType).FieldIdx(key)
if !ok {
var extra interface{}
- dec.Decode(&extra)
+ if err := dec.Decode(&extra); err != nil {
+ return err
+ }
continue
}
diff --git a/arrow/array/util.go b/arrow/array/util.go
index c8316ab..1d27b2f 100644
--- a/arrow/array/util.go
+++ b/arrow/array/util.go
@@ -210,15 +210,71 @@ func RecordFromStructArray(in *Struct, schema
*arrow.Schema) arrow.Record {
//
// A record batch from JSON is equivalent to reading a struct array in from
json and then
// converting it to a record batch.
+//
+// See https://github.com/apache/arrow-go/issues/448 for more details on
+// why this isn't a simple wrapper around FromJSON.
func RecordFromJSON(mem memory.Allocator, schema *arrow.Schema, r io.Reader,
opts ...FromJSONOption) (arrow.Record, int64, error) {
- st := arrow.StructOf(schema.Fields()...)
- arr, off, err := FromJSON(mem, st, r, opts...)
- if err != nil {
- return nil, off, err
+ var cfg fromJSONCfg
+ for _, o := range opts {
+ o(&cfg)
+ }
+
+ if cfg.startOffset != 0 {
+ seeker, ok := r.(io.ReadSeeker)
+ if !ok {
+ return nil, 0, errors.New("using StartOffset option
requires reader to be a ReadSeeker, cannot seek")
+ }
+ if _, err := seeker.Seek(cfg.startOffset, io.SeekStart); err !=
nil {
+ return nil, 0, fmt.Errorf("failed to seek to start
offset %d: %w", cfg.startOffset, err)
+ }
+ }
+
+ if mem == nil {
+ mem = memory.DefaultAllocator
+ }
+
+ bldr := NewRecordBuilder(mem, schema)
+ defer bldr.Release()
+
+ dec := json.NewDecoder(r)
+ if cfg.useNumber {
+ dec.UseNumber()
+ }
+
+ if !cfg.multiDocument {
+ t, err := dec.Token()
+ if err != nil {
+ return nil, dec.InputOffset(), err
+ }
+ if delim, ok := t.(json.Delim); !ok || delim != '[' {
+ return nil, dec.InputOffset(), fmt.Errorf("json doc
must be an array, found %s", delim)
+ }
+
+ for dec.More() {
+ if err := dec.Decode(bldr); err != nil {
+ return nil, dec.InputOffset(),
fmt.Errorf("failed to decode json: %w", err)
+ }
+ }
+
+ // consume the last ']'
+ if _, err = dec.Token(); err != nil {
+ return nil, dec.InputOffset(), fmt.Errorf("failed to
decode json: %w", err)
+ }
+
+ return bldr.NewRecord(), dec.InputOffset(), nil
+ }
+
+ for {
+ err := dec.Decode(bldr)
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ return nil, dec.InputOffset(), fmt.Errorf("failed to
decode json: %w", err)
+ }
}
- defer arr.Release()
- return RecordFromStructArray(arr.(*Struct), schema), off, nil
+ return bldr.NewRecord(), dec.InputOffset(), nil
}
// RecordToJSON writes out the given record following the format of each row
is a single object