This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 24c66fa8 feat(arrow/ipc): support custom_metadata on RecordBatch 
messages (#669)
24c66fa8 is described below

commit 24c66fa8bb79bd311be821627da0723a6428c84a
Author: Rusty Conover <[email protected]>
AuthorDate: Mon Feb 23 10:57:30 2026 -0500

    feat(arrow/ipc): support custom_metadata on RecordBatch messages (#669)
    
    ### Rationale for this change
    
    Arrow IPC Messages wrap RecordBatch data in a Message flatbuffer which
    has a custom_metadata field (vector of KeyValue pairs). PyArrow and
    other implementations use this to attach per-batch metadata, but the Go
    implementation previously ignored it on both read and write paths.
    
    ### What changes are included in this PR?
    
    Add a RecordBatchWithMetadata optional interface to avoid breaking the
    existing RecordBatch interface. The simpleRecord implementation carries
    metadata through NewSlice, SetColumn, and IPC round-trips. Includes
    PyArrow-generated test fixtures for interoperability validation.
    
    ### Are these changes tested?
    
    Yes tests are included.
    
    ### Are there any user-facing changes?
    
    Yes custom_metadata is now supported on RecordBatches.
    
    This change was developed with AI assistance and manually reviewed.
---
 arrow/array/record.go                            |  21 +-
 arrow/ipc/file_reader.go                         |   7 +-
 arrow/ipc/ipc_test.go                            | 257 +++++++++++++++++++++++
 arrow/ipc/metadata.go                            |  12 +-
 arrow/ipc/testdata/custom_metadata.arrow         | Bin 0 -> 1266 bytes
 arrow/ipc/testdata/custom_metadata_stream.arrows | Bin 0 -> 944 bytes
 arrow/ipc/writer.go                              |  11 +-
 arrow/record.go                                  |   8 +
 8 files changed, 303 insertions(+), 13 deletions(-)

diff --git a/arrow/array/record.go b/arrow/array/record.go
index cca79661..0aaf771b 100644
--- a/arrow/array/record.go
+++ b/arrow/array/record.go
@@ -121,6 +121,7 @@ type simpleRecord struct {
        refCount atomic.Int64
 
        schema *arrow.Schema
+       meta   arrow.Metadata
 
        rows int64
        arrs []arrow.Array
@@ -131,8 +132,19 @@ type simpleRecord struct {
 // NewRecordBatch panics if the columns and schema are inconsistent.
 // NewRecordBatch panics if rows is larger than the height of the columns.
 func NewRecordBatch(schema *arrow.Schema, cols []arrow.Array, nrows int64) 
arrow.RecordBatch {
+       return NewRecordBatchWithMetadata(schema, cols, nrows, arrow.Metadata{})
+}
+
+// NewRecordBatchWithMetadata returns a basic, non-lazy in-memory record batch
+// with custom metadata. The metadata is preserved during IPC serialization
+// at the Message level.
+//
+// NewRecordBatchWithMetadata panics if the columns and schema are 
inconsistent.
+// NewRecordBatchWithMetadata panics if rows is larger than the height of the 
columns.
+func NewRecordBatchWithMetadata(schema *arrow.Schema, cols []arrow.Array, 
nrows int64, meta arrow.Metadata) arrow.RecordBatch {
        rec := &simpleRecord{
                schema: schema,
+               meta:   meta,
                rows:   nrows,
                arrs:   make([]arrow.Array, len(cols)),
        }
@@ -189,7 +201,7 @@ func (rec *simpleRecord) SetColumn(i int, arr arrow.Array) 
(arrow.RecordBatch, e
        copy(arrs, rec.arrs)
        arrs[i] = arr
 
-       return NewRecordBatch(rec.schema, arrs, rec.rows), nil
+       return NewRecordBatchWithMetadata(rec.schema, arrs, rec.rows, 
rec.meta), nil
 }
 
 func (rec *simpleRecord) validate() error {
@@ -240,6 +252,7 @@ func (rec *simpleRecord) Release() {
 }
 
 func (rec *simpleRecord) Schema() *arrow.Schema    { return rec.schema }
+func (rec *simpleRecord) Metadata() arrow.Metadata { return rec.meta }
 func (rec *simpleRecord) NumRows() int64           { return rec.rows }
 func (rec *simpleRecord) NumCols() int64           { return 
int64(len(rec.arrs)) }
 func (rec *simpleRecord) Columns() []arrow.Array   { return rec.arrs }
@@ -262,7 +275,7 @@ func (rec *simpleRecord) NewSlice(i, j int64) 
arrow.RecordBatch {
                        arr.Release()
                }
        }()
-       return NewRecordBatch(rec.schema, arrs, j-i)
+       return NewRecordBatchWithMetadata(rec.schema, arrs, j-i, rec.meta)
 }
 
 func (rec *simpleRecord) String() string {
@@ -504,6 +517,6 @@ func IterFromReader(rdr RecordReader) 
iter.Seq2[arrow.RecordBatch, error] {
 }
 
 var (
-       _ arrow.RecordBatch = (*simpleRecord)(nil)
-       _ RecordReader      = (*simpleRecords)(nil)
+       _ arrow.RecordBatchWithMetadata = (*simpleRecord)(nil)
+       _ RecordReader                  = (*simpleRecords)(nil)
 )
diff --git a/arrow/ipc/file_reader.go b/arrow/ipc/file_reader.go
index d742d852..605b768e 100644
--- a/arrow/ipc/file_reader.go
+++ b/arrow/ipc/file_reader.go
@@ -458,6 +458,11 @@ func newRecordBatch(schema *arrow.Schema, memo 
*dictutils.Memo, meta *memory.Buf
                defer codec.Close()
        }
 
+       customMeta, err := metadataFromFB(msg)
+       if err != nil {
+               panic(err)
+       }
+
        ctx := &arrayLoaderContext{
                src: ipcSource{
                        meta:     &md,
@@ -488,7 +493,7 @@ func newRecordBatch(schema *arrow.Schema, memo 
*dictutils.Memo, meta *memory.Buf
                defer cols[i].Release()
        }
 
-       return array.NewRecordBatch(schema, cols, rows)
+       return array.NewRecordBatchWithMetadata(schema, cols, rows, customMeta)
 }
 
 type ipcSource struct {
diff --git a/arrow/ipc/ipc_test.go b/arrow/ipc/ipc_test.go
index a23b3476..93bd987a 100644
--- a/arrow/ipc/ipc_test.go
+++ b/arrow/ipc/ipc_test.go
@@ -22,6 +22,7 @@ import (
        "fmt"
        "io"
        "math/rand"
+       "os"
        "strconv"
        "strings"
        "testing"
@@ -688,3 +689,259 @@ func TestArrowBinaryIPCWriterTruncatedVOffsets(t 
*testing.T) {
 
        require.False(t, reader.Next())
 }
+
+func TestRecordBatchCustomMetadataRoundtrip(t *testing.T) {
+       mem := memory.NewGoAllocator()
+       schema := arrow.NewSchema(
+               []arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
+               nil,
+       )
+
+       bldr := array.NewInt32Builder(mem)
+       defer bldr.Release()
+       bldr.AppendValues([]int32{1, 2, 3}, nil)
+       col := bldr.NewArray()
+       defer col.Release()
+
+       meta := arrow.NewMetadata([]string{"k1", "k2"}, []string{"v1", "v2"})
+       rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col}, 3, 
meta)
+       defer rec.Release()
+
+       // Write to IPC stream
+       var buf bytes.Buffer
+       writer := ipc.NewWriter(&buf, ipc.WithSchema(schema))
+       require.NoError(t, writer.Write(rec))
+       require.NoError(t, writer.Close())
+
+       // Read back
+       reader, err := ipc.NewReader(bytes.NewReader(buf.Bytes()))
+       require.NoError(t, err)
+       defer reader.Release()
+
+       require.True(t, reader.Next())
+       got := reader.RecordBatch()
+
+       rm, ok := got.(arrow.RecordBatchWithMetadata)
+       require.True(t, ok, "record batch should implement 
RecordBatchWithMetadata")
+
+       require.Equal(t, meta.Keys(), rm.Metadata().Keys())
+       require.Equal(t, meta.Values(), rm.Metadata().Values())
+}
+
+func TestRecordBatchCustomMetadataFileRoundtrip(t *testing.T) {
+       mem := memory.NewGoAllocator()
+       schema := arrow.NewSchema(
+               []arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
+               nil,
+       )
+
+       bldr := array.NewInt32Builder(mem)
+       defer bldr.Release()
+       bldr.AppendValues([]int32{10, 20}, nil)
+       col := bldr.NewArray()
+       defer col.Release()
+
+       meta := arrow.NewMetadata([]string{"file_key"}, []string{"file_value"})
+       rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col}, 2, 
meta)
+       defer rec.Release()
+
+       // Write to IPC file
+       var buf bytes.Buffer
+       writer, err := ipc.NewFileWriter(&buf, ipc.WithSchema(schema))
+       require.NoError(t, err)
+       require.NoError(t, writer.Write(rec))
+       require.NoError(t, writer.Close())
+
+       // Read back
+       reader, err := ipc.NewFileReader(bytes.NewReader(buf.Bytes()))
+       require.NoError(t, err)
+       defer reader.Close()
+
+       require.Equal(t, 1, reader.NumRecords())
+       got, err := reader.RecordBatchAt(0)
+       require.NoError(t, err)
+       defer got.Release()
+
+       rm, ok := got.(arrow.RecordBatchWithMetadata)
+       require.True(t, ok, "record batch should implement 
RecordBatchWithMetadata")
+
+       require.Equal(t, meta.Keys(), rm.Metadata().Keys())
+       require.Equal(t, meta.Values(), rm.Metadata().Values())
+}
+
+func TestRecordBatchCustomMetadataInterop(t *testing.T) {
+       t.Run("file", func(t *testing.T) {
+               f, err := os.Open("testdata/custom_metadata.arrow")
+               require.NoError(t, err)
+               defer f.Close()
+
+               reader, err := ipc.NewFileReader(f)
+               require.NoError(t, err)
+               defer reader.Close()
+
+               // Verify schema metadata
+               schemaMeta := reader.Schema().Metadata()
+               idx := schemaMeta.FindKey("schema_key")
+               require.GreaterOrEqual(t, idx, 0)
+               require.Equal(t, "schema_value", schemaMeta.Values()[idx])
+
+               require.Equal(t, 2, reader.NumRecords())
+
+               // Batch 1
+               rec0, err := reader.RecordBatchAt(0)
+               require.NoError(t, err)
+               defer rec0.Release()
+               rm0, ok := rec0.(arrow.RecordBatchWithMetadata)
+               require.True(t, ok)
+               m0 := rm0.Metadata()
+               require.Equal(t, "1", m0.Values()[m0.FindKey("batch_num")])
+               require.Equal(t, "value1", m0.Values()[m0.FindKey("key1")])
+
+               // Batch 2
+               rec1, err := reader.RecordBatchAt(1)
+               require.NoError(t, err)
+               defer rec1.Release()
+               rm1, ok := rec1.(arrow.RecordBatchWithMetadata)
+               require.True(t, ok)
+               m1 := rm1.Metadata()
+               require.Equal(t, "2", m1.Values()[m1.FindKey("batch_num")])
+               require.Equal(t, "value2", m1.Values()[m1.FindKey("key2")])
+       })
+
+       t.Run("stream", func(t *testing.T) {
+               data, err := 
os.ReadFile("testdata/custom_metadata_stream.arrows")
+               require.NoError(t, err)
+
+               reader, err := ipc.NewReader(bytes.NewReader(data))
+               require.NoError(t, err)
+               defer reader.Release()
+
+               // Verify schema metadata
+               schemaMeta := reader.Schema().Metadata()
+               idx := schemaMeta.FindKey("schema_key")
+               require.GreaterOrEqual(t, idx, 0)
+               require.Equal(t, "schema_value", schemaMeta.Values()[idx])
+
+               // Batch 1
+               require.True(t, reader.Next())
+               rec0 := reader.RecordBatch()
+               rm0, ok := rec0.(arrow.RecordBatchWithMetadata)
+               require.True(t, ok)
+               m0 := rm0.Metadata()
+               require.Equal(t, "1", m0.Values()[m0.FindKey("batch_num")])
+               require.Equal(t, "value1", m0.Values()[m0.FindKey("key1")])
+
+               // Batch 2
+               require.True(t, reader.Next())
+               rec1 := reader.RecordBatch()
+               rm1, ok := rec1.(arrow.RecordBatchWithMetadata)
+               require.True(t, ok)
+               m1 := rm1.Metadata()
+               require.Equal(t, "2", m1.Values()[m1.FindKey("batch_num")])
+               require.Equal(t, "value2", m1.Values()[m1.FindKey("key2")])
+
+               require.False(t, reader.Next())
+       })
+}
+
+func TestRecordBatchCustomMetadataSlice(t *testing.T) {
+       mem := memory.NewGoAllocator()
+       schema := arrow.NewSchema(
+               []arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
+               nil,
+       )
+
+       bldr := array.NewInt32Builder(mem)
+       defer bldr.Release()
+       bldr.AppendValues([]int32{1, 2, 3, 4}, nil)
+       col := bldr.NewArray()
+       defer col.Release()
+
+       meta := arrow.NewMetadata([]string{"slice_key"}, 
[]string{"slice_value"})
+       rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col}, 4, 
meta)
+       defer rec.Release()
+
+       sliced := rec.NewSlice(1, 3)
+       defer sliced.Release()
+
+       rm, ok := sliced.(arrow.RecordBatchWithMetadata)
+       require.True(t, ok, "sliced record should implement 
RecordBatchWithMetadata")
+       require.Equal(t, meta.Keys(), rm.Metadata().Keys())
+       require.Equal(t, meta.Values(), rm.Metadata().Values())
+       require.EqualValues(t, 2, sliced.NumRows())
+}
+
+func TestRecordBatchCustomMetadataSetColumn(t *testing.T) {
+       mem := memory.NewGoAllocator()
+       schema := arrow.NewSchema(
+               []arrow.Field{
+                       {Name: "x", Type: arrow.PrimitiveTypes.Int32},
+                       {Name: "y", Type: arrow.PrimitiveTypes.Int32},
+               },
+               nil,
+       )
+
+       bldr := array.NewInt32Builder(mem)
+       defer bldr.Release()
+       bldr.AppendValues([]int32{1, 2}, nil)
+       col1 := bldr.NewArray()
+       defer col1.Release()
+
+       bldr.AppendValues([]int32{3, 4}, nil)
+       col2 := bldr.NewArray()
+       defer col2.Release()
+
+       meta := arrow.NewMetadata([]string{"set_key"}, []string{"set_value"})
+       rec := array.NewRecordBatchWithMetadata(schema, []arrow.Array{col1, 
col2}, 2, meta)
+       defer rec.Release()
+
+       bldr.AppendValues([]int32{5, 6}, nil)
+       newCol := bldr.NewArray()
+       defer newCol.Release()
+
+       updated, err := rec.SetColumn(0, newCol)
+       require.NoError(t, err)
+       defer updated.Release()
+
+       rm, ok := updated.(arrow.RecordBatchWithMetadata)
+       require.True(t, ok, "updated record should implement 
RecordBatchWithMetadata")
+       require.Equal(t, meta.Keys(), rm.Metadata().Keys())
+       require.Equal(t, meta.Values(), rm.Metadata().Values())
+}
+
+func TestRecordBatchNoMetadataRoundtrip(t *testing.T) {
+       mem := memory.NewGoAllocator()
+       schema := arrow.NewSchema(
+               []arrow.Field{{Name: "x", Type: arrow.PrimitiveTypes.Int32}},
+               nil,
+       )
+
+       bldr := array.NewInt32Builder(mem)
+       defer bldr.Release()
+       bldr.AppendValues([]int32{1, 2, 3}, nil)
+       col := bldr.NewArray()
+       defer col.Release()
+
+       rec := array.NewRecordBatch(schema, []arrow.Array{col}, 3)
+       defer rec.Release()
+
+       // Write to IPC stream
+       var buf bytes.Buffer
+       writer := ipc.NewWriter(&buf, ipc.WithSchema(schema))
+       require.NoError(t, writer.Write(rec))
+       require.NoError(t, writer.Close())
+
+       // Read back
+       reader, err := ipc.NewReader(bytes.NewReader(buf.Bytes()))
+       require.NoError(t, err)
+       defer reader.Release()
+
+       require.True(t, reader.Next())
+       got := reader.RecordBatch()
+
+       rm, ok := got.(arrow.RecordBatchWithMetadata)
+       require.True(t, ok, "record batch should implement 
RecordBatchWithMetadata")
+
+       // Metadata should be empty, not nil
+       require.Equal(t, 0, rm.Metadata().Len())
+}
diff --git a/arrow/ipc/metadata.go b/arrow/ipc/metadata.go
index b83c1a84..91f37af7 100644
--- a/arrow/ipc/metadata.go
+++ b/arrow/ipc/metadata.go
@@ -1141,13 +1141,15 @@ func writeFBBuilder(b *flatbuffers.Builder, mem 
memory.Allocator) *memory.Buffer
        return buf
 }
 
-func writeMessageFB(b *flatbuffers.Builder, mem memory.Allocator, hdrType 
flatbuf.MessageHeader, hdr flatbuffers.UOffsetT, bodyLen int64) *memory.Buffer {
+func writeMessageFB(b *flatbuffers.Builder, mem memory.Allocator, hdrType 
flatbuf.MessageHeader, hdr flatbuffers.UOffsetT, bodyLen int64, customMetadata 
arrow.Metadata) *memory.Buffer {
+       metaFB := metadataToFB(b, customMetadata, 
flatbuf.MessageStartCustomMetadataVector)
 
        flatbuf.MessageStart(b)
        flatbuf.MessageAddVersion(b, 
flatbuf.MetadataVersion(currentMetadataVersion))
        flatbuf.MessageAddHeaderType(b, hdrType)
        flatbuf.MessageAddHeader(b, hdr)
        flatbuf.MessageAddBodyLength(b, bodyLen)
+       flatbuf.MessageAddCustomMetadata(b, metaFB)
        msg := flatbuf.MessageEnd(b)
        b.Finish(msg)
 
@@ -1157,7 +1159,7 @@ func writeMessageFB(b *flatbuffers.Builder, mem 
memory.Allocator, hdrType flatbu
 func writeSchemaMessage(schema *arrow.Schema, mem memory.Allocator, dict 
*dictutils.Mapper) *memory.Buffer {
        b := flatbuffers.NewBuilder(1024)
        schemaFB := schemaToFB(b, schema, dict)
-       return writeMessageFB(b, mem, flatbuf.MessageHeaderSchema, schemaFB, 0)
+       return writeMessageFB(b, mem, flatbuf.MessageHeaderSchema, schemaFB, 0, 
arrow.Metadata{})
 }
 
 func writeFileFooter(schema *arrow.Schema, dicts, recs []dataBlock, w 
io.Writer) error {
@@ -1184,10 +1186,10 @@ func writeFileFooter(schema *arrow.Schema, dicts, recs 
[]dataBlock, w io.Writer)
        return err
 }
 
-func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields 
[]fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType, 
variadicCounts []int64) *memory.Buffer {
+func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields 
[]fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType, 
variadicCounts []int64, customMetadata arrow.Metadata) *memory.Buffer {
        b := flatbuffers.NewBuilder(0)
        recFB := recordToFB(b, size, bodyLength, fields, meta, codec, 
variadicCounts)
-       return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB, 
bodyLength)
+       return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB, 
bodyLength, customMetadata)
 }
 
 func writeDictionaryMessage(mem memory.Allocator, id int64, isDelta bool, 
size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata, codec 
flatbuf.CompressionType, variadicCounts []int64) *memory.Buffer {
@@ -1199,7 +1201,7 @@ func writeDictionaryMessage(mem memory.Allocator, id 
int64, isDelta bool, size,
        flatbuf.DictionaryBatchAddData(b, recFB)
        flatbuf.DictionaryBatchAddIsDelta(b, isDelta)
        dictFB := flatbuf.DictionaryBatchEnd(b)
-       return writeMessageFB(b, mem, flatbuf.MessageHeaderDictionaryBatch, 
dictFB, bodyLength)
+       return writeMessageFB(b, mem, flatbuf.MessageHeaderDictionaryBatch, 
dictFB, bodyLength, arrow.Metadata{})
 }
 
 func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields 
[]fieldMetadata, meta []bufferMetadata, codec flatbuf.CompressionType, 
variadicCounts []int64) flatbuffers.UOffsetT {
diff --git a/arrow/ipc/testdata/custom_metadata.arrow 
b/arrow/ipc/testdata/custom_metadata.arrow
new file mode 100644
index 00000000..c553dbc2
Binary files /dev/null and b/arrow/ipc/testdata/custom_metadata.arrow differ
diff --git a/arrow/ipc/testdata/custom_metadata_stream.arrows 
b/arrow/ipc/testdata/custom_metadata_stream.arrows
new file mode 100644
index 00000000..0aeef248
Binary files /dev/null and b/arrow/ipc/testdata/custom_metadata_stream.arrows 
differ
diff --git a/arrow/ipc/writer.go b/arrow/ipc/writer.go
index 29646ae2..fa5db526 100644
--- a/arrow/ipc/writer.go
+++ b/arrow/ipc/writer.go
@@ -1035,11 +1035,16 @@ func (w *recordEncoder) Encode(p *Payload, rec 
arrow.RecordBatch) error {
        if err := w.encode(p, rec); err != nil {
                return err
        }
-       return w.encodeMetadata(p, rec.NumRows())
+
+       var customMeta arrow.Metadata
+       if rm, ok := rec.(arrow.RecordBatchWithMetadata); ok {
+               customMeta = rm.Metadata()
+       }
+       return w.encodeMetadata(p, rec.NumRows(), customMeta)
 }
 
-func (w *recordEncoder) encodeMetadata(p *Payload, nrows int64) error {
-       p.meta = writeRecordMessage(w.mem, nrows, p.size, w.fields, w.meta, 
w.codec, w.variadicCounts)
+func (w *recordEncoder) encodeMetadata(p *Payload, nrows int64, customMetadata 
arrow.Metadata) error {
+       p.meta = writeRecordMessage(w.mem, nrows, p.size, w.fields, w.meta, 
w.codec, w.variadicCounts, customMetadata)
        return nil
 }
 
diff --git a/arrow/record.go b/arrow/record.go
index 010dbe95..77ea1e18 100644
--- a/arrow/record.go
+++ b/arrow/record.go
@@ -48,6 +48,14 @@ type RecordBatch interface {
        NewSlice(i, j int64) RecordBatch
 }
 
+// RecordBatchWithMetadata is an optional interface for RecordBatch
+// implementations that support custom metadata. This metadata is
+// stored at the Message level in Arrow IPC format.
+type RecordBatchWithMetadata interface {
+       RecordBatch
+       Metadata() Metadata
+}
+
 // Record as a term typically refers to a single row, but this type represents 
a batch of rows, known in Arrow parlance
 // as a RecordBatch. This alias is provided for backwards compatibility.
 //

Reply via email to