caldempsey commented on PR #449:
URL: https://github.com/apache/arrow-go/pull/449#issuecomment-3117538917
Ran a test using data that's not quite production, but closer. The use case
involves ingesting transcripts of audio files via Spark Connect. These
transcripts can be quite large, up to 4MB each. We're streaming this data into
a data lake using Spark Connect instead of traditional Spark jobs, which allows
us to receive synchronous responses at the call site and support coordination
patterns (GET/POST) using standard REST consumers (which is _so_ much better
for error handling). Hence the massive strings. Here are some benchmarks that
compare your changes against Goccy's decode method.
The reason I chose Goccy's decode method as this test to identify the root
cause is if you follow the callsite trail, it _forces_ a call to Goccy's string
decoder:
```go
func (d *stringDecoder) DecodeStream(s *Stream, depth int64, p
unsafe.Pointer) error {
bytes, err := d.decodeStreamByte(s)
if err != nil {
return err
}
if bytes == nil {
return nil
}
**(**string)(unsafe.Pointer(&p)) = *(*string)(unsafe.Pointer(&bytes))
s.reset()
return nil
}
```
In the 4MB test with or without 'special' or more likely to be escaped
unicode characters:
```
4MB_large_payload_1000_records_non_az_chars:
Records: 1000
Target record size: 4000000 bytes
JSON Array size: 3814.95 MB
NDJSON size: 3814.95 MB
RecordFromJSON: 20.802824917s (183.39 MB/s, 48 rec/s)
JSONReader (chunked): 16.895239958s (225.80 MB/s, 59 rec/s)
JSONReader (single chunk): 15.627604875s (244.12 MB/s, 64 rec/s)
GoccyJSONDecoder: 8.260023459s (461.86 MB/s, 121 rec/s)
// Goccy Decode DNF
```
We **can* observe the slowness if we let it finish in the tiny payload test:
```
tiny payload:
Records: 1000
Target record size: 10240 bytes
JSON Array size: 10.01 MB
NDJSON size: 10.01 MB
RecordFromJSON: 40.55425ms (246.84 MB/s, 24658 rec/s)
JSONReader (chunked): 37.439792ms (267.37 MB/s, 26710 rec/s)
JSONReader (single chunk): 36.667208ms (273.00 MB/s, 27272 rec/s)
GoccyJSONUnmarshal: 15.728292ms (636.45 MB/s, 63580 rec/s)
GoccyJSONDecoder: 1.866923375s (636.45 MB/s, 63580 rec/s) //
phwor
--- PASS: TestFileLabelsPayloads/tiny_payload (2.05s)
--- PASS: TestFileLabelsPayloads (2.05s)
PASS
```
So I think we've latched onto the root cause and can show RecordFromJSON is
now dealing gracefully.
```go
package main_test
import (
"bytes"
"encoding/json"
"fmt"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
gojson "github.com/goccy/go-json"
"github.com/google/uuid"
"strings"
"testing"
"time"
)
// FileLabel represents the structure from the Python benchmark
type FileLabel struct {
FilePath string `json:"file_path"`
SomeLabel string `json:"some_label"`
Confidence float64 `json:"confidence"`
ModelName string `json:"model_name"`
ProcessedAt string `json:"processed_at"`
BatchID string `json:"batch_id"`
Metadata string `json:"metadata"` // JSON string
}
// MetadataContent represents the metadata structure
type MetadataContent struct {
ProcessingTimeMs int `json:"processing_time_ms"`
Version string `json:"version"`
Padding string `json:"padding"`
RecordID int `json:"record_id"`
ExtraField1 string `json:"extra_field_1"`
ExtraField2 string `json:"extra_field_2"`
ExtraField3 string `json:"extra_field_3"`
}
// makePadding creates a padding string of the specified size
func makePadding(size int, useNonAZChars bool) string {
alphabet := "abcdefghijklmnopqrstuvwxyz"
if useNonAZChars {
alphabet = "世界你好"
}
repeats := size/len(alphabet) + 1
return strings.Repeat(alphabet, repeats)[:size]
}
// genPayload
func genPayload(numRecords int, recordSizeBytes int, useNonAZChars bool)
[]FileLabel {
batchID := uuid.New().String()
baseRecordSize := 200
metadataSize := recordSizeBytes - baseRecordSize
if metadataSize < 100 {
metadataSize = 100
}
padding := makePadding(metadataSize, useNonAZChars)
nowISO := time.Now().UTC().Format(time.RFC3339)
payload := make([]FileLabel, numRecords)
for i := 0; i < numRecords; i++ {
metadata := MetadataContent{
ProcessingTimeMs: 150 + (i % 100),
Version: "1.0",
Padding: padding,
RecordID: i,
ExtraField1: fmt.Sprintf("extra_value_%d", i),
ExtraField2: fmt.Sprintf("extra_value_%d", i*2),
ExtraField3: fmt.Sprintf("extra_value_%d", i*3),
}
metadataJSON, _ := json.Marshal(metadata)
payload[i] = FileLabel{
FilePath:
fmt.Sprintf("s3://test-bucket/batch-%s/file-%d.jpg", batchID, i),
SomeLabel: fmt.Sprintf("label_%d", i%10),
Confidence: 0.85 + float64(i%15)/100.0,
ModelName: fmt.Sprintf("model_v%d", (i%5)+1),
ProcessedAt: nowISO,
BatchID: batchID,
Metadata: string(metadataJSON),
}
}
return payload
}
// payloadToNDJSON converts payload to newline-delimited JSON bytes
func payloadToNDJSON(payload []FileLabel) []byte {
var buf bytes.Buffer
encoder := json.NewEncoder(&buf)
for _, record := range payload {
encoder.Encode(record)
}
return buf.Bytes()
}
// payloadToJSONArray converts payload to JSON array format
func payloadToJSONArray(payload []FileLabel) []byte {
data, _ := json.Marshal(payload)
return data
}
// Define Arrow schema matching the FileLabel structure
func getFileLabelsSchema() *arrow.Schema {
return arrow.NewSchema([]arrow.Field{
{Name: "file_path", Type: arrow.BinaryTypes.String},
{Name: "some_label", Type: arrow.BinaryTypes.String},
{Name: "confidence", Type: arrow.PrimitiveTypes.Float64},
{Name: "model_name", Type: arrow.BinaryTypes.String},
{Name: "processed_at", Type: arrow.BinaryTypes.String},
{Name: "batch_id", Type: arrow.BinaryTypes.String},
{Name: "metadata", Type: arrow.BinaryTypes.String},
}, nil)
}
func benchmarkRecordFromJSON(data []byte, schema *arrow.Schema)
(time.Duration, int64, error) {
pool := memory.NewGoAllocator()
start := time.Now()
record, _, err := array.RecordFromJSON(pool, schema,
bytes.NewReader(data))
duration := time.Since(start)
if err != nil {
return duration, 0, err
}
numRows := record.NumRows()
record.Release()
return duration, numRows, nil
}
func benchmarkGoccyJSONDecoder(jsonData []byte) (time.Duration, int, error) {
start := time.Now()
var records []FileLabel
decoder := gojson.NewDecoder(bytes.NewReader(jsonData))
if err := decoder.Decode(&records); err != nil {
return 0, 0, err
}
return time.Since(start), len(records), nil
}
func benchmarkGoccyUnmarshal(jsonData []byte) (time.Duration, int, error) {
start := time.Now()
var records []FileLabel
err := gojson.Unmarshal(jsonData, &records)
return time.Since(start), len(records), err
}
func benchmarkJSONReader(ndjsonData []byte, schema *arrow.Schema)
(time.Duration, int64, error) {
pool := memory.NewGoAllocator()
start := time.Now()
rdr := array.NewJSONReader(bytes.NewReader(ndjsonData), schema,
array.WithAllocator(pool))
defer rdr.Release()
var totalRows int64
for rdr.Next() {
rec := rdr.Record()
totalRows += rec.NumRows()
}
if err := rdr.Err(); err != nil {
return time.Since(start), totalRows, err
}
duration := time.Since(start)
return duration, totalRows, nil
}
func benchmarkJSONReaderSingleChunk(ndjsonData []byte, schema *arrow.Schema)
(time.Duration, int64, error) {
pool := memory.NewGoAllocator()
start := time.Now()
rdr := array.NewJSONReader(bytes.NewReader(ndjsonData), schema,
array.WithAllocator(pool),
array.WithChunk(-1))
defer rdr.Release()
if !rdr.Next() {
return time.Since(start), 0, fmt.Errorf("no record found")
}
rec := rdr.Record()
numRows := rec.NumRows()
duration := time.Since(start)
return duration, numRows, nil
}
// BenchmarkScenario represents a test scenario
type BenchmarkScenario struct {
Name string
NumRecords int
RecordSizeBytes int
UseNonAZChars bool
}
func TestFileLabelsPayloads(t *testing.T) {
scenarios := []BenchmarkScenario{
{
Name: "tiny payload",
NumRecords: 1000,
RecordSizeBytes: 10_240,
UseNonAZChars: false,
},
{
Name: "large_payload_50k_records",
NumRecords: 50_000,
RecordSizeBytes: 10_240,
UseNonAZChars: false,
},
{
Name: "4MB_large_payload_1000_records",
NumRecords: 1000,
RecordSizeBytes: 4000000,
UseNonAZChars: false,
},
{
Name:
"large_payload_50k_records_non_az_chars",
NumRecords: 50_000,
RecordSizeBytes: 10_240,
UseNonAZChars: true,
},
{
Name:
"4MB_large_payload_1000_records_non_az_chars",
NumRecords: 1000,
RecordSizeBytes: 4000000,
UseNonAZChars: true,
},
}
schema := getFileLabelsSchema()
fmt.Println("Spark Connect API Payload Benchmarks")
fmt.Println("==================================")
for _, scenario := range scenarios {
t.Run(scenario.Name, func(t *testing.T) {
// Generate payload
payload := genPayload(scenario.NumRecords,
scenario.RecordSizeBytes, scenario.UseNonAZChars)
// Convert to both formats
jsonArrayData := payloadToJSONArray(payload)
ndjsonData := payloadToNDJSON(payload)
jsonArraySizeMB := float64(len(jsonArrayData)) / (1024
* 1024)
ndjsonSizeMB := float64(len(ndjsonData)) / (1024 * 1024)
fmt.Printf("\n%s:\n", scenario.Name)
fmt.Printf(" Records: %d\n", scenario.NumRecords)
fmt.Printf(" Target record size: %d bytes\n",
scenario.RecordSizeBytes)
fmt.Printf(" JSON Array size: %.2f MB\n",
jsonArraySizeMB)
fmt.Printf(" NDJSON size: %.2f MB\n", ndjsonSizeMB)
// Benchmark RecordFromJSON (expects JSON array)
duration1, rows1, err1 :=
benchmarkRecordFromJSON(jsonArrayData, schema)
if err1 != nil {
t.Errorf("RecordFromJSON failed: %v", err1)
} else {
throughput1 := jsonArraySizeMB /
duration1.Seconds()
recordsPerSec1 := float64(rows1) /
duration1.Seconds()
fmt.Printf(" RecordFromJSON: %12v
(%.2f MB/s, %.0f rec/s)\n",
duration1, throughput1, recordsPerSec1)
}
// Benchmark JSONReader with NDJSON (default chunking)
duration2, rows2, err2 :=
benchmarkJSONReader(ndjsonData, schema)
if err2 != nil {
t.Errorf("JSONReader (chunked) failed: %v",
err2)
} else {
throughput2 := ndjsonSizeMB /
duration2.Seconds()
recordsPerSec2 := float64(rows2) /
duration2.Seconds()
fmt.Printf(" JSONReader (chunked): %12v
(%.2f MB/s, %.0f rec/s)\n",
duration2, throughput2, recordsPerSec2)
}
// Benchmark JSONReader with NDJSON (single chunk)
duration3, rows3, err3 :=
benchmarkJSONReaderSingleChunk(ndjsonData, schema)
if err3 != nil {
t.Errorf("JSONReader (single chunk) failed:
%v", err3)
} else {
throughput3 := ndjsonSizeMB /
duration3.Seconds()
recordsPerSec3 := float64(rows3) /
duration3.Seconds()
fmt.Printf(" JSONReader (single chunk): %12v
(%.2f MB/s, %.0f rec/s)\n",
duration3, throughput3, recordsPerSec3)
}
// Benchmark Goccy JSON deserialization
duration4, rows4, err4 :=
benchmarkGoccyUnmarshal(jsonArrayData)
if err4 != nil {
t.Errorf("GoccyJSONUnmarshal failed: %v", err1)
} else {
throughput4 := jsonArraySizeMB /
duration4.Seconds()
recordsPerSec4 := float64(rows4) /
duration4.Seconds()
fmt.Printf(" GoccyJSONUnmarshal:
%12v (%.2f MB/s, %.0f rec/s)\n",
duration4, throughput4, recordsPerSec4)
}
// Benchmark Goccy Decoder (expects JSON array)
duration5, rows5, err5 :=
benchmarkGoccyJSONDecoder(jsonArrayData)
if err5 != nil {
t.Errorf("GoccyJSONDecoder failed: %v", err1)
} else {
throughput5 := jsonArraySizeMB /
duration4.Seconds()
recordsPerSec5 := float64(rows5) /
duration4.Seconds()
fmt.Printf(" GoccyJSONDecoder: %12v
(%.2f MB/s, %.0f rec/s)\n",
duration5, throughput5, recordsPerSec5)
}
})
}
}
```
So looks like the root cause was determined. I think the only way this could
be improved would be to have some way of enabling concurrent record processing:
there's not a lot of reason why we can't use multiple threads as long as we can
stitch the response back together w/ the right indices.
LGTM 🚀
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]