This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new a7d23a7  fix(arrow/array): Fix RecordFromJSON perf (#449)
a7d23a7 is described below

commit a7d23a73d268f4a0a77376096312ec5621ac1ddb
Author: Matt Topol <[email protected]>
AuthorDate: Fri Jul 25 11:19:57 2025 -0400

    fix(arrow/array): Fix RecordFromJSON perf (#449)
    
    fixes #448
    
    ### Rationale for this change
    When dealing with unicode in json values, `RecordFromJSON` seems to have
    a significant performance slow-down due to an odd interaction of
    decoders with goccy/go-json. `NewJSONReader` doesn't exhibit the issue
    because it essentially creates a NewDecoder for each line/record by
    decoding into a RecordBuilder directly.
    
    ### What changes are included in this PR?
    Change `RecordFromJSON` to work closer to `NewJSONReader` in how it
    decodes directly into a `RecordBuilder` so that we side-step the
    performance problem for large amounts of JSON.
    
    ### Are these changes tested?
    Yes, benchmarks are added to keep track of the performance of using
    `RecordFromJSON` vs `NewJSONReader` for the same data.
    
    ### Are there any user-facing changes?
    Only a performance improvement when JSON has large amounts of unicode
    data.
---
 arrow/array/json_reader_test.go | 122 ++++++++++++++++++++++++++++++++++++++++
 arrow/array/struct.go           |   4 +-
 arrow/array/util.go             |  68 ++++++++++++++++++++--
 3 files changed, 187 insertions(+), 7 deletions(-)

diff --git a/arrow/array/json_reader_test.go b/arrow/array/json_reader_test.go
index c53e5dd..d73120b 100644
--- a/arrow/array/json_reader_test.go
+++ b/arrow/array/json_reader_test.go
@@ -17,12 +17,15 @@
 package array_test
 
 import (
+       "bytes"
+       "fmt"
        "strings"
        "testing"
 
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/internal/json"
        "github.com/stretchr/testify/assert"
 )
 
@@ -139,3 +142,122 @@ func TestUnmarshalJSON(t *testing.T) {
 
        assert.NotNil(t, record)
 }
+
+func generateJSONData(n int) []byte {
+       records := make([]map[string]any, n)
+       for i := range n {
+               records[i] = map[string]any{
+                       "id":       i,
+                       "name":     fmt.Sprintf("record_%d", i),
+                       "value":    float64(i) * 1.5,
+                       "active":   i%2 == 0,
+                       "metadata": fmt.Sprintf("metadata_%d_%s", i, 
make([]byte, 500)),
+               }
+       }
+
+       data, _ := json.Marshal(records)
+       return data
+}
+
+func jsonArrayToNDJSON(data []byte) ([]byte, error) {
+       var records []json.RawMessage
+       if err := json.Unmarshal(data, &records); err != nil {
+               return nil, err
+       }
+
+       var ndjson bytes.Buffer
+       for _, record := range records {
+               ndjson.Write(record)
+               ndjson.WriteString("\n")
+       }
+
+       return ndjson.Bytes(), nil
+}
+
+func BenchmarkRecordFromJSON(b *testing.B) {
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64},
+               {Name: "name", Type: arrow.BinaryTypes.String},
+               {Name: "value", Type: arrow.PrimitiveTypes.Float64},
+               {Name: "active", Type: arrow.FixedWidthTypes.Boolean},
+               {Name: "metadata", Type: arrow.BinaryTypes.String},
+       }, nil)
+
+       testSizes := []int64{1000, 5000, 10000}
+
+       for _, size := range testSizes {
+               b.Run(fmt.Sprintf("Size_%d", size), func(b *testing.B) {
+                       data := generateJSONData(int(size))
+                       pool := memory.NewGoAllocator()
+
+                       var rdr bytes.Reader
+                       b.SetBytes(int64(len(data)))
+                       b.ResetTimer()
+                       for range b.N {
+                               rdr.Reset(data)
+
+                               record, _, err := array.RecordFromJSON(pool, 
schema, &rdr)
+                               if err != nil {
+                                       b.Error(err)
+                               }
+
+                               if record.NumRows() != size {
+                                       b.Errorf("expected %d rows, got %d", 
size, record.NumRows())
+                               }
+                               record.Release()
+                       }
+               })
+       }
+}
+
+func BenchmarkJSONReader(b *testing.B) {
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64},
+               {Name: "name", Type: arrow.BinaryTypes.String},
+               {Name: "value", Type: arrow.PrimitiveTypes.Float64},
+               {Name: "active", Type: arrow.FixedWidthTypes.Boolean},
+               {Name: "metadata", Type: arrow.BinaryTypes.String},
+       }, nil)
+
+       testSizes := []int64{1000, 5000, 10000}
+
+       for _, size := range testSizes {
+               b.Run(fmt.Sprintf("Size_%d", size), func(b *testing.B) {
+                       data := generateJSONData(int(size))
+                       data, err := jsonArrayToNDJSON(data)
+                       if err != nil {
+                               b.Fatalf("failed to convert JSON to NDJSON: 
%v", err)
+                       }
+
+                       var rdr bytes.Reader
+                       for _, chkSize := range []int{-1, int(size / 2), 
int(size)} {
+                               b.Run(fmt.Sprintf("ChunkSize_%d", chkSize), 
func(b *testing.B) {
+                                       pool := memory.NewGoAllocator()
+                                       b.SetBytes(int64(len(data)))
+                                       b.ResetTimer()
+                                       for range b.N {
+                                               rdr.Reset(data)
+
+                                               jsonRdr := 
array.NewJSONReader(&rdr, schema, array.WithAllocator(pool),
+                                                       
array.WithChunk(chkSize))
+
+                                               var totalRows int64
+                                               for jsonRdr.Next() {
+                                                       rec := jsonRdr.Record()
+                                                       totalRows += 
rec.NumRows()
+                                               }
+
+                                               if err := jsonRdr.Err(); err != 
nil {
+                                                       b.Errorf("error reading 
JSON: %v", err)
+                                               }
+                                               jsonRdr.Release()
+
+                                               if totalRows != size {
+                                                       b.Errorf("expected %d 
rows, got %d", size, totalRows)
+                                               }
+                                       }
+                               })
+                       }
+               })
+       }
+}
diff --git a/arrow/array/struct.go b/arrow/array/struct.go
index e0902ee..6883712 100644
--- a/arrow/array/struct.go
+++ b/arrow/array/struct.go
@@ -473,7 +473,9 @@ func (b *StructBuilder) UnmarshalOne(dec *json.Decoder) 
error {
                        idx, ok := b.dtype.(*arrow.StructType).FieldIdx(key)
                        if !ok {
                                var extra interface{}
-                               dec.Decode(&extra)
+                               if err := dec.Decode(&extra); err != nil {
+                                       return err
+                               }
                                continue
                        }
 
diff --git a/arrow/array/util.go b/arrow/array/util.go
index c8316ab..1d27b2f 100644
--- a/arrow/array/util.go
+++ b/arrow/array/util.go
@@ -210,15 +210,71 @@ func RecordFromStructArray(in *Struct, schema 
*arrow.Schema) arrow.Record {
 //
 // A record batch from JSON is equivalent to reading a struct array in from 
json and then
 // converting it to a record batch.
+//
+// See https://github.com/apache/arrow-go/issues/448 for more details on
+// why this isn't a simple wrapper around FromJSON.
 func RecordFromJSON(mem memory.Allocator, schema *arrow.Schema, r io.Reader, 
opts ...FromJSONOption) (arrow.Record, int64, error) {
-       st := arrow.StructOf(schema.Fields()...)
-       arr, off, err := FromJSON(mem, st, r, opts...)
-       if err != nil {
-               return nil, off, err
+       var cfg fromJSONCfg
+       for _, o := range opts {
+               o(&cfg)
+       }
+
+       if cfg.startOffset != 0 {
+               seeker, ok := r.(io.ReadSeeker)
+               if !ok {
+                       return nil, 0, errors.New("using StartOffset option 
requires reader to be a ReadSeeker, cannot seek")
+               }
+               if _, err := seeker.Seek(cfg.startOffset, io.SeekStart); err != 
nil {
+                       return nil, 0, fmt.Errorf("failed to seek to start 
offset %d: %w", cfg.startOffset, err)
+               }
+       }
+
+       if mem == nil {
+               mem = memory.DefaultAllocator
+       }
+
+       bldr := NewRecordBuilder(mem, schema)
+       defer bldr.Release()
+
+       dec := json.NewDecoder(r)
+       if cfg.useNumber {
+               dec.UseNumber()
+       }
+
+       if !cfg.multiDocument {
+               t, err := dec.Token()
+               if err != nil {
+                       return nil, dec.InputOffset(), err
+               }
+               if delim, ok := t.(json.Delim); !ok || delim != '[' {
+                       return nil, dec.InputOffset(), fmt.Errorf("json doc 
must be an array, found %s", delim)
+               }
+
+               for dec.More() {
+                       if err := dec.Decode(bldr); err != nil {
+                               return nil, dec.InputOffset(), 
fmt.Errorf("failed to decode json: %w", err)
+                       }
+               }
+
+               // consume the last ']'
+               if _, err = dec.Token(); err != nil {
+                       return nil, dec.InputOffset(), fmt.Errorf("failed to 
decode json: %w", err)
+               }
+
+               return bldr.NewRecord(), dec.InputOffset(), nil
+       }
+
+       for {
+               err := dec.Decode(bldr)
+               if err != nil {
+                       if errors.Is(err, io.EOF) {
+                               break
+                       }
+                       return nil, dec.InputOffset(), fmt.Errorf("failed to 
decode json: %w", err)
+               }
        }
-       defer arr.Release()
 
-       return RecordFromStructArray(arr.(*Struct), schema), off, nil
+       return bldr.NewRecord(), dec.InputOffset(), nil
 }
 
 // RecordToJSON writes out the given record following the format of each row 
is a single object

Reply via email to