caldempsey commented on PR #449:
URL: https://github.com/apache/arrow-go/pull/449#issuecomment-3117362847
```
large_payload_50k_records:
Records: 50000
Target record size: 10240 bytes
JSON Array size: 500.93 MB
NDJSON size: 500.93 MB
JSONReader (chunked): 1.267049792s (395.35 MB/s, 39462 rec/s)
JSONReader (single chunk): 1.058549125s (473.22 MB/s, 47234 rec/s)
RecordFromJSON: 1.043343541s (480.12 MB/s, 47923 rec/s)
Speedup vs RecordFromJSON:
JSONReader (chunked): 0.82x
JSONReader (single chunk): 0.99x
--- PASS: TestFileLabelsPayloads/large_payload_50k_records (4.69s)
=== RUN TestFileLabelsPayloads/4MB_payload_1000_records
4MB_payload_1000_records:
Records: 1000
Target record size: 4000000 bytes
JSON Array size: 3814.94 MB
NDJSON size: 3814.94 MB
JSONReader (chunked): 9.64786775s (395.42 MB/s, 104 rec/s)
JSONReader (single chunk): 8.009190083s (476.32 MB/s, 125 rec/s)
RecordFromJSON: 7.984174833s (477.81 MB/s, 125 rec/s)
Speedup vs RecordFromJSON:
JSONReader (chunked): 0.83x
JSONReader (single chunk): 1.00x
--- PASS: TestFileLabelsPayloads/4MB_payload_1000_records (35.29s)
--- PASS: TestFileLabelsPayloads (39.97s)
PASS
```
Ran a test using data that's not quite production, but closer. The use case
involves ingesting transcripts of audio files via Spark Connect. These
transcripts can be quite large, up to 4MB each. We're streaming this data into
a data lake using Spark Connect instead of traditional Spark jobs, which allows
us to receive synchronous responses at the call site and support coordination
patterns (GET/POST) using standard REST consumers (which is _so_ much better
for error handling). Hence the massive strings.
Feel free to yoink this benchmark into your own tests — it should be a
fairly comparable setup.
```go
package main_test
import (
"bytes"
"encoding/json"
"fmt"
"strings"
"testing"
"time"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/google/uuid"
)
// FileLabel represents the structure from the Python benchmark
type FileLabel struct {
FilePath string `json:"file_path"`
SomeLabel string `json:"some_label"`
Confidence float64 `json:"confidence"`
ModelName string `json:"model_name"`
ProcessedAt string `json:"processed_at"`
BatchID string `json:"batch_id"`
Metadata string `json:"metadata"` // JSON string
}
// MetadataContent represents the metadata structure
type MetadataContent struct {
ProcessingTimeMs int `json:"processing_time_ms"`
Version string `json:"version"`
Padding string `json:"padding"`
RecordID int `json:"record_id"`
ExtraField1 string `json:"extra_field_1"`
ExtraField2 string `json:"extra_field_2"`
ExtraField3 string `json:"extra_field_3"`
}
// makePadding creates a padding string of the specified size
func makePadding(size int) string {
alphabet := "abcdefghijklmnopqrstuvwxyz"
repeats := size/len(alphabet) + 1
return strings.Repeat(alphabet, repeats)[:size]
}
// generateLargePayload generates the same payload structure as the Python
benchmark
func generateLargePayload(numRecords int, recordSizeBytes int) []FileLabel {
batchID := uuid.New().String()
baseRecordSize := 200
metadataSize := recordSizeBytes - baseRecordSize
if metadataSize < 100 {
metadataSize = 100
}
padding := makePadding(metadataSize)
nowISO := time.Now().UTC().Format(time.RFC3339)
payload := make([]FileLabel, numRecords)
for i := 0; i < numRecords; i++ {
metadata := MetadataContent{
ProcessingTimeMs: 150 + (i % 100),
Version: "1.0",
Padding: padding,
RecordID: i,
ExtraField1: fmt.Sprintf("extra_value_%d", i),
ExtraField2: fmt.Sprintf("extra_value_%d", i*2),
ExtraField3: fmt.Sprintf("extra_value_%d", i*3),
}
metadataJSON, _ := json.Marshal(metadata)
payload[i] = FileLabel{
FilePath:
fmt.Sprintf("s3://test-bucket/batch-%s/file-%d.jpg", batchID, i),
SomeLabel: fmt.Sprintf("label_%d", i%10),
Confidence: 0.85 + float64(i%15)/100.0,
ModelName: fmt.Sprintf("model_v%d", (i%5)+1),
ProcessedAt: nowISO,
BatchID: batchID,
Metadata: string(metadataJSON),
}
}
return payload
}
// payloadToNDJSON converts payload to newline-delimited JSON bytes
func payloadToNDJSON(payload []FileLabel) []byte {
var buf bytes.Buffer
encoder := json.NewEncoder(&buf)
for _, record := range payload {
encoder.Encode(record)
}
return buf.Bytes()
}
// payloadToJSONArray converts payload to JSON array format
func payloadToJSONArray(payload []FileLabel) []byte {
data, _ := json.Marshal(payload)
return data
}
// Define Arrow schema matching the FileLabel structure
func getFileLabelsSchema() *arrow.Schema {
return arrow.NewSchema([]arrow.Field{
{Name: "file_path", Type: arrow.BinaryTypes.String},
{Name: "some_label", Type: arrow.BinaryTypes.String},
{Name: "confidence", Type: arrow.PrimitiveTypes.Float64},
{Name: "model_name", Type: arrow.BinaryTypes.String},
{Name: "processed_at", Type: arrow.BinaryTypes.String},
{Name: "batch_id", Type: arrow.BinaryTypes.String},
{Name: "metadata", Type: arrow.BinaryTypes.String},
}, nil)
}
func benchmarkRecordFromJSON(data []byte, schema *arrow.Schema)
(time.Duration, int64, error) {
pool := memory.NewGoAllocator()
start := time.Now()
record, _, err := array.RecordFromJSON(pool, schema,
bytes.NewReader(data))
duration := time.Since(start)
if err != nil {
return duration, 0, err
}
numRows := record.NumRows()
record.Release()
return duration, numRows, nil
}
func benchmarkJSONReader(ndjsonData []byte, schema *arrow.Schema)
(time.Duration, int64, error) {
pool := memory.NewGoAllocator()
start := time.Now()
rdr := array.NewJSONReader(bytes.NewReader(ndjsonData), schema,
array.WithAllocator(pool))
defer rdr.Release()
var totalRows int64
for rdr.Next() {
rec := rdr.Record()
totalRows += rec.NumRows()
}
if err := rdr.Err(); err != nil {
return time.Since(start), totalRows, err
}
duration := time.Since(start)
return duration, totalRows, nil
}
func benchmarkJSONReaderSingleChunk(ndjsonData []byte, schema *arrow.Schema)
(time.Duration, int64, error) {
pool := memory.NewGoAllocator()
start := time.Now()
rdr := array.NewJSONReader(bytes.NewReader(ndjsonData), schema,
array.WithAllocator(pool),
array.WithChunk(-1))
defer rdr.Release()
if !rdr.Next() {
return time.Since(start), 0, fmt.Errorf("no record found")
}
rec := rdr.Record()
numRows := rec.NumRows()
duration := time.Since(start)
return duration, numRows, nil
}
// BenchmarkScenario represents a test scenario
type BenchmarkScenario struct {
Name string
NumRecords int
RecordSizeBytes int
}
func TestFileLabelsPayloads(t *testing.T) {
scenarios := []BenchmarkScenario{
{
Name: "large_payload_50k_records",
NumRecords: 50_000,
RecordSizeBytes: 10_240,
},
{
Name: "4MB_payload_1000_records",
NumRecords: 1000,
RecordSizeBytes: 4000000,
},
}
schema := getFileLabelsSchema()
fmt.Println("Spark Connect API Payload Benchmarks")
fmt.Println("==================================")
for _, scenario := range scenarios {
t.Run(scenario.Name, func(t *testing.T) {
// Generate payload
payload := generateLargePayload(scenario.NumRecords,
scenario.RecordSizeBytes)
// Convert to both formats
jsonArrayData := payloadToJSONArray(payload)
ndjsonData := payloadToNDJSON(payload)
jsonArraySizeMB := float64(len(jsonArrayData)) / (1024
* 1024)
ndjsonSizeMB := float64(len(ndjsonData)) / (1024 * 1024)
fmt.Printf("\n%s:\n", scenario.Name)
fmt.Printf(" Records: %d\n", scenario.NumRecords)
fmt.Printf(" Target record size: %d bytes\n",
scenario.RecordSizeBytes)
fmt.Printf(" JSON Array size: %.2f MB\n",
jsonArraySizeMB)
fmt.Printf(" NDJSON size: %.2f MB\n", ndjsonSizeMB)
// Benchmark JSONReader with NDJSON (default chunking)
duration2, rows2, err2 :=
benchmarkJSONReader(ndjsonData, schema)
if err2 != nil {
t.Errorf("JSONReader (chunked) failed: %v",
err2)
} else {
throughput2 := ndjsonSizeMB /
duration2.Seconds()
recordsPerSec2 := float64(rows2) /
duration2.Seconds()
fmt.Printf(" JSONReader (chunked): %12v
(%.2f MB/s, %.0f rec/s)\n",
duration2, throughput2, recordsPerSec2)
}
// Benchmark JSONReader with NDJSON (single chunk)
duration3, rows3, err3 :=
benchmarkJSONReaderSingleChunk(ndjsonData, schema)
if err3 != nil {
t.Errorf("JSONReader (single chunk) failed:
%v", err3)
} else {
throughput3 := ndjsonSizeMB /
duration3.Seconds()
recordsPerSec3 := float64(rows3) /
duration3.Seconds()
fmt.Printf(" JSONReader (single chunk): %12v
(%.2f MB/s, %.0f rec/s)\n",
duration3, throughput3, recordsPerSec3)
}
// Benchmark RecordFromJSON (expects JSON array)
duration1, rows1, err1 :=
benchmarkRecordFromJSON(jsonArrayData, schema)
if err1 != nil {
t.Errorf("RecordFromJSON failed: %v", err1)
} else {
throughput1 := jsonArraySizeMB /
duration1.Seconds()
recordsPerSec1 := float64(rows1) /
duration1.Seconds()
fmt.Printf(" RecordFromJSON: %12v
(%.2f MB/s, %.0f rec/s)\n",
duration1, throughput1, recordsPerSec1)
}
if err1 == nil && err2 == nil && err3 == nil {
fmt.Printf("\n Speedup vs RecordFromJSON:\n")
fmt.Printf(" JSONReader (chunked):
%.2fx\n", float64(duration1)/float64(duration2))
fmt.Printf(" JSONReader (single chunk):
%.2fx\n", float64(duration1)/float64(duration3))
}
})
}
}
```
Think the only way this might be improved within the scope of my use case is
to enable concurrent record processing at this point: if we're consuming ND
JSON there's not a lot of reason why we can't use multiple threads as long as
we can stitch the response back together w/ the right indices?
LGTM 🚀
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]