caldempsey commented on PR #449:
URL: https://github.com/apache/arrow-go/pull/449#issuecomment-3117538917

   Ran a test using data that's not quite production, but closer. The use case 
involves ingesting transcripts of audio files via Spark Connect. These 
transcripts can be quite large, up to 4MB each. We're streaming this data into 
a data lake using Spark Connect instead of traditional Spark jobs, which allows 
us to receive synchronous responses at the call site and support coordination 
patterns (GET/POST) using standard REST consumers (which is _so_ much better 
for error handling). Hence the massive strings. Here are some benchmarks that 
compare your changes against Goccy's decode method. 
   
   The reason I chose Goccy's decode method as this test to identify the root 
cause is if you follow the callsite trail, it _forces_ a call to Goccy's string 
decoder:
   
   ```go
   func (d *stringDecoder) DecodeStream(s *Stream, depth int64, p 
unsafe.Pointer) error {
        bytes, err := d.decodeStreamByte(s)
        if err != nil {
                return err
        }
        if bytes == nil {
                return nil
        }
        **(**string)(unsafe.Pointer(&p)) = *(*string)(unsafe.Pointer(&bytes))
        s.reset()
        return nil
   }
   ```
   
   In the 4MB test with or without 'special' or more likely to be escaped 
unicode characters:
   
   ```
   4MB_large_payload_1000_records_non_az_chars:
     Records: 1000
     Target record size: 4000000 bytes
     JSON Array size: 3814.95 MB
     NDJSON size: 3814.95 MB
     RecordFromJSON:            20.802824917s (183.39 MB/s, 48 rec/s)
     JSONReader (chunked):      16.895239958s (225.80 MB/s, 59 rec/s)
     JSONReader (single chunk): 15.627604875s (244.12 MB/s, 64 rec/s)
     GoccyJSONDecoder:            8.260023459s (461.86 MB/s, 121 rec/s)
     // Goccy Decode DNF 
   ```
   
   We **can* observe the slowness if we let it finish in the tiny payload test:
   
   ```
   tiny payload:
     Records: 1000
     Target record size: 10240 bytes
     JSON Array size: 10.01 MB
     NDJSON size: 10.01 MB
     RecordFromJSON:              40.55425ms (246.84 MB/s, 24658 rec/s)
     JSONReader (chunked):       37.439792ms (267.37 MB/s, 26710 rec/s)
     JSONReader (single chunk):  36.667208ms (273.00 MB/s, 27272 rec/s)
     GoccyJSONUnmarshal:             15.728292ms (636.45 MB/s, 63580 rec/s)
     GoccyJSONDecoder:            1.866923375s (636.45 MB/s, 63580 rec/s) // 
phwor 
   --- PASS: TestFileLabelsPayloads/tiny_payload (2.05s)
   --- PASS: TestFileLabelsPayloads (2.05s)
   PASS
   ``` 
   
   So I think we've latched onto the root cause and can show RecordFromJSON is 
now dealing gracefully.
   
   ```go
   package main_test
   
   import (
        "bytes"
        "encoding/json"
        "fmt"
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/arrow/memory"
        gojson "github.com/goccy/go-json"
        "github.com/google/uuid"
        "strings"
        "testing"
        "time"
   )
   
   // FileLabel represents the structure from the Python benchmark
   type FileLabel struct {
        FilePath    string  `json:"file_path"`
        SomeLabel   string  `json:"some_label"`
        Confidence  float64 `json:"confidence"`
        ModelName   string  `json:"model_name"`
        ProcessedAt string  `json:"processed_at"`
        BatchID     string  `json:"batch_id"`
        Metadata    string  `json:"metadata"` // JSON string
   }
   
   // MetadataContent represents the metadata structure
   type MetadataContent struct {
        ProcessingTimeMs int    `json:"processing_time_ms"`
        Version          string `json:"version"`
        Padding          string `json:"padding"`
        RecordID         int    `json:"record_id"`
        ExtraField1      string `json:"extra_field_1"`
        ExtraField2      string `json:"extra_field_2"`
        ExtraField3      string `json:"extra_field_3"`
   }
   
   // makePadding creates a padding string of the specified size
   func makePadding(size int, useNonAZChars bool) string {
        alphabet := "abcdefghijklmnopqrstuvwxyz"
        if useNonAZChars {
                alphabet = "世界你好"
        }
        repeats := size/len(alphabet) + 1
        return strings.Repeat(alphabet, repeats)[:size]
   }
   
   // genPayload
   func genPayload(numRecords int, recordSizeBytes int, useNonAZChars bool) 
[]FileLabel {
        batchID := uuid.New().String()
        baseRecordSize := 200
        metadataSize := recordSizeBytes - baseRecordSize
        if metadataSize < 100 {
                metadataSize = 100
        }
        padding := makePadding(metadataSize, useNonAZChars)
        nowISO := time.Now().UTC().Format(time.RFC3339)
   
        payload := make([]FileLabel, numRecords)
        for i := 0; i < numRecords; i++ {
                metadata := MetadataContent{
                        ProcessingTimeMs: 150 + (i % 100),
                        Version:          "1.0",
                        Padding:          padding,
                        RecordID:         i,
                        ExtraField1:      fmt.Sprintf("extra_value_%d", i),
                        ExtraField2:      fmt.Sprintf("extra_value_%d", i*2),
                        ExtraField3:      fmt.Sprintf("extra_value_%d", i*3),
                }
   
                metadataJSON, _ := json.Marshal(metadata)
   
                payload[i] = FileLabel{
                        FilePath:    
fmt.Sprintf("s3://test-bucket/batch-%s/file-%d.jpg", batchID, i),
                        SomeLabel:   fmt.Sprintf("label_%d", i%10),
                        Confidence:  0.85 + float64(i%15)/100.0,
                        ModelName:   fmt.Sprintf("model_v%d", (i%5)+1),
                        ProcessedAt: nowISO,
                        BatchID:     batchID,
                        Metadata:    string(metadataJSON),
                }
        }
        return payload
   }
   
   // payloadToNDJSON converts payload to newline-delimited JSON bytes
   func payloadToNDJSON(payload []FileLabel) []byte {
        var buf bytes.Buffer
        encoder := json.NewEncoder(&buf)
        for _, record := range payload {
                encoder.Encode(record)
        }
        return buf.Bytes()
   }
   
   // payloadToJSONArray converts payload to JSON array format
   func payloadToJSONArray(payload []FileLabel) []byte {
        data, _ := json.Marshal(payload)
        return data
   }
   
   // Define Arrow schema matching the FileLabel structure
   func getFileLabelsSchema() *arrow.Schema {
        return arrow.NewSchema([]arrow.Field{
                {Name: "file_path", Type: arrow.BinaryTypes.String},
                {Name: "some_label", Type: arrow.BinaryTypes.String},
                {Name: "confidence", Type: arrow.PrimitiveTypes.Float64},
                {Name: "model_name", Type: arrow.BinaryTypes.String},
                {Name: "processed_at", Type: arrow.BinaryTypes.String},
                {Name: "batch_id", Type: arrow.BinaryTypes.String},
                {Name: "metadata", Type: arrow.BinaryTypes.String},
        }, nil)
   }
   
   func benchmarkRecordFromJSON(data []byte, schema *arrow.Schema) 
(time.Duration, int64, error) {
        pool := memory.NewGoAllocator()
   
        start := time.Now()
        record, _, err := array.RecordFromJSON(pool, schema, 
bytes.NewReader(data))
        duration := time.Since(start)
   
        if err != nil {
                return duration, 0, err
        }
   
        numRows := record.NumRows()
        record.Release()
   
        return duration, numRows, nil
   }
   
   func benchmarkGoccyJSONDecoder(jsonData []byte) (time.Duration, int, error) {
        start := time.Now()
   
        var records []FileLabel
   
        decoder := gojson.NewDecoder(bytes.NewReader(jsonData))
        if err := decoder.Decode(&records); err != nil {
                return 0, 0, err
        }
   
        return time.Since(start), len(records), nil
   }
   
   func benchmarkGoccyUnmarshal(jsonData []byte) (time.Duration, int, error) {
        start := time.Now()
   
        var records []FileLabel
   
        err := gojson.Unmarshal(jsonData, &records)
   
        return time.Since(start), len(records), err
   }
   
   func benchmarkJSONReader(ndjsonData []byte, schema *arrow.Schema) 
(time.Duration, int64, error) {
        pool := memory.NewGoAllocator()
   
        start := time.Now()
   
        rdr := array.NewJSONReader(bytes.NewReader(ndjsonData), schema,
                array.WithAllocator(pool))
        defer rdr.Release()
   
        var totalRows int64
        for rdr.Next() {
                rec := rdr.Record()
                totalRows += rec.NumRows()
        }
   
        if err := rdr.Err(); err != nil {
                return time.Since(start), totalRows, err
        }
   
        duration := time.Since(start)
        return duration, totalRows, nil
   }
   
   func benchmarkJSONReaderSingleChunk(ndjsonData []byte, schema *arrow.Schema) 
(time.Duration, int64, error) {
        pool := memory.NewGoAllocator()
   
        start := time.Now()
   
        rdr := array.NewJSONReader(bytes.NewReader(ndjsonData), schema,
                array.WithAllocator(pool),
                array.WithChunk(-1))
        defer rdr.Release()
   
        if !rdr.Next() {
                return time.Since(start), 0, fmt.Errorf("no record found")
        }
   
        rec := rdr.Record()
        numRows := rec.NumRows()
   
        duration := time.Since(start)
        return duration, numRows, nil
   }
   
   // BenchmarkScenario represents a test scenario
   type BenchmarkScenario struct {
        Name            string
        NumRecords      int
        RecordSizeBytes int
        UseNonAZChars   bool
   }
   
   func TestFileLabelsPayloads(t *testing.T) {
        scenarios := []BenchmarkScenario{
                {
                        Name:            "tiny payload",
                        NumRecords:      1000,
                        RecordSizeBytes: 10_240,
                        UseNonAZChars:   false,
                },
                {
                        Name:            "large_payload_50k_records",
                        NumRecords:      50_000,
                        RecordSizeBytes: 10_240,
                        UseNonAZChars:   false,
                },
                {
                        Name:            "4MB_large_payload_1000_records",
                        NumRecords:      1000,
                        RecordSizeBytes: 4000000,
                        UseNonAZChars:   false,
                },
                {
                        Name:            
"large_payload_50k_records_non_az_chars",
                        NumRecords:      50_000,
                        RecordSizeBytes: 10_240,
                        UseNonAZChars:   true,
                },
                {
                        Name:            
"4MB_large_payload_1000_records_non_az_chars",
                        NumRecords:      1000,
                        RecordSizeBytes: 4000000,
                        UseNonAZChars:   true,
                },
        }
   
        schema := getFileLabelsSchema()
   
        fmt.Println("Spark Connect API Payload Benchmarks")
        fmt.Println("==================================")
   
        for _, scenario := range scenarios {
                t.Run(scenario.Name, func(t *testing.T) {
                        // Generate payload
                        payload := genPayload(scenario.NumRecords, 
scenario.RecordSizeBytes, scenario.UseNonAZChars)
   
                        // Convert to both formats
                        jsonArrayData := payloadToJSONArray(payload)
                        ndjsonData := payloadToNDJSON(payload)
   
                        jsonArraySizeMB := float64(len(jsonArrayData)) / (1024 
* 1024)
                        ndjsonSizeMB := float64(len(ndjsonData)) / (1024 * 1024)
   
                        fmt.Printf("\n%s:\n", scenario.Name)
                        fmt.Printf("  Records: %d\n", scenario.NumRecords)
                        fmt.Printf("  Target record size: %d bytes\n", 
scenario.RecordSizeBytes)
                        fmt.Printf("  JSON Array size: %.2f MB\n", 
jsonArraySizeMB)
                        fmt.Printf("  NDJSON size: %.2f MB\n", ndjsonSizeMB)
   
                        // Benchmark RecordFromJSON (expects JSON array)
                        duration1, rows1, err1 := 
benchmarkRecordFromJSON(jsonArrayData, schema)
                        if err1 != nil {
                                t.Errorf("RecordFromJSON failed: %v", err1)
                        } else {
                                throughput1 := jsonArraySizeMB / 
duration1.Seconds()
                                recordsPerSec1 := float64(rows1) / 
duration1.Seconds()
                                fmt.Printf("  RecordFromJSON:            %12v 
(%.2f MB/s, %.0f rec/s)\n",
                                        duration1, throughput1, recordsPerSec1)
                        }
                        // Benchmark JSONReader with NDJSON (default chunking)
                        duration2, rows2, err2 := 
benchmarkJSONReader(ndjsonData, schema)
                        if err2 != nil {
                                t.Errorf("JSONReader (chunked) failed: %v", 
err2)
                        } else {
                                throughput2 := ndjsonSizeMB / 
duration2.Seconds()
                                recordsPerSec2 := float64(rows2) / 
duration2.Seconds()
                                fmt.Printf("  JSONReader (chunked):      %12v 
(%.2f MB/s, %.0f rec/s)\n",
                                        duration2, throughput2, recordsPerSec2)
                        }
   
                        // Benchmark JSONReader with NDJSON (single chunk)
                        duration3, rows3, err3 := 
benchmarkJSONReaderSingleChunk(ndjsonData, schema)
                        if err3 != nil {
                                t.Errorf("JSONReader (single chunk) failed: 
%v", err3)
                        } else {
                                throughput3 := ndjsonSizeMB / 
duration3.Seconds()
                                recordsPerSec3 := float64(rows3) / 
duration3.Seconds()
                                fmt.Printf("  JSONReader (single chunk): %12v 
(%.2f MB/s, %.0f rec/s)\n",
                                        duration3, throughput3, recordsPerSec3)
                        }
                        // Benchmark Goccy JSON deserialization
                        duration4, rows4, err4 := 
benchmarkGoccyUnmarshal(jsonArrayData)
                        if err4 != nil {
                                t.Errorf("GoccyJSONUnmarshal failed: %v", err1)
                        } else {
                                throughput4 := jsonArraySizeMB / 
duration4.Seconds()
                                recordsPerSec4 := float64(rows4) / 
duration4.Seconds()
                                fmt.Printf("  GoccyJSONUnmarshal:            
%12v (%.2f MB/s, %.0f rec/s)\n",
                                        duration4, throughput4, recordsPerSec4)
                        }
                        // Benchmark Goccy Decoder (expects JSON array)
                        duration5, rows5, err5 := 
benchmarkGoccyJSONDecoder(jsonArrayData)
                        if err5 != nil {
                                t.Errorf("GoccyJSONDecoder failed: %v", err1)
                        } else {
                                throughput5 := jsonArraySizeMB / 
duration4.Seconds()
                                recordsPerSec5 := float64(rows5) / 
duration4.Seconds()
                                fmt.Printf("  GoccyJSONDecoder:            %12v 
(%.2f MB/s, %.0f rec/s)\n",
                                        duration5, throughput5, recordsPerSec5)
                        }
                })
        }
   }
   ```
   
   So looks like the root cause was determined. I think the only way this could 
be improved would be to have some way of enabling concurrent record processing: 
there's not a lot of reason why we can't use multiple threads as long as we can 
stitch the response back together w/ the right indices. 
   
   LGTM 🚀 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to