This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-js.git
The following commit(s) were added to refs/heads/main by this push:
new f203cfb feat: Add custom metadata support for IPC messages and
RecordBatch (#361)
f203cfb is described below
commit f203cfbaef82620633d0409dd1dc7e734195c788
Author: Rusty Conover <[email protected]>
AuthorDate: Fri Feb 20 20:27:20 2026 -0500
feat: Add custom metadata support for IPC messages and RecordBatch (#361)
## What's Changed
Decode and expose custom metadata from IPC message headers, propagating
it through the reader to RecordBatch instances. This allows accessing
per-batch metadata stored in Arrow IPC streams and files.
Extend RecordBatchWriter to support writing custom metadata to IPC
messages, similar to PyArrow's write_batch(batch, custom_metadata=...).
Changes:
- Update Message.from() to accept optional metadata parameter
- Update Message.encode() to serialize custom metadata to FlatBuffers
- Add customMetadata parameter to RecordBatchWriter.write()
- Add mergeMetadata() helper that combines batch.metadata with the
parameter (parameter takes precedence for duplicate keys)
- Add comprehensive integration tests for write/read round-trip
Usage:
writer.write(batch, new Map([['key', 'value']]));
---
src/ipc/metadata/message.ts | 40 +++-
src/ipc/reader.ts | 12 +-
src/ipc/writer.ts | 6 +-
src/recordbatch.ts | 25 ++-
test/data/test_message_metadata.arrow | Bin 0 -> 7154 bytes
test/unit/ipc/reader/message-metadata-tests.ts | 97 ++++++++++
test/unit/ipc/writer/message-metadata-tests.ts | 255 +++++++++++++++++++++++++
7 files changed, 416 insertions(+), 19 deletions(-)
diff --git a/src/ipc/metadata/message.ts b/src/ipc/metadata/message.ts
index b41ec4a..347b7c9 100644
--- a/src/ipc/metadata/message.ts
+++ b/src/ipc/metadata/message.ts
@@ -82,7 +82,8 @@ export class Message<T extends MessageHeader = any> {
const bodyLength: bigint = _message.bodyLength()!;
const version: MetadataVersion = _message.version();
const headerType: MessageHeader = _message.headerType();
- const message = new Message(bodyLength, version, headerType);
+ const metadata = decodeMessageCustomMetadata(_message);
+ const message = new Message(bodyLength, version, headerType,
undefined, metadata);
message._createHeader = decodeMessageHeader(_message, headerType);
return message;
}
@@ -98,11 +99,24 @@ export class Message<T extends MessageHeader = any> {
} else if (message.isDictionaryBatch()) {
headerOffset = DictionaryBatch.encode(b, message.header() as
DictionaryBatch);
}
+
+ // Encode custom metadata if present (must be done before startMessage)
+ const customMetadataOffset = !(message.metadata &&
message.metadata.size > 0) ? -1 :
+ _Message.createCustomMetadataVector(b,
[...message.metadata].map(([k, v]) => {
+ const key = b.createString(`${k}`);
+ const val = b.createString(`${v}`);
+ _KeyValue.startKeyValue(b);
+ _KeyValue.addKey(b, key);
+ _KeyValue.addValue(b, val);
+ return _KeyValue.endKeyValue(b);
+ }));
+
_Message.startMessage(b);
_Message.addVersion(b, MetadataVersion.V5);
_Message.addHeader(b, headerOffset);
_Message.addHeaderType(b, message.headerType);
_Message.addBodyLength(b, BigInt(message.bodyLength));
+ if (customMetadataOffset !== -1) { _Message.addCustomMetadata(b,
customMetadataOffset); }
_Message.finishMessageBuffer(b, _Message.endMessage(b));
return b.asUint8Array();
}
@@ -113,7 +127,7 @@ export class Message<T extends MessageHeader = any> {
return new Message(0, MetadataVersion.V5, MessageHeader.Schema,
header);
}
if (header instanceof RecordBatch) {
- return new Message(bodyLength, MetadataVersion.V5,
MessageHeader.RecordBatch, header);
+ return new Message(bodyLength, MetadataVersion.V5,
MessageHeader.RecordBatch, header, header.metadata);
}
if (header instanceof DictionaryBatch) {
return new Message(bodyLength, MetadataVersion.V5,
MessageHeader.DictionaryBatch, header);
@@ -126,24 +140,27 @@ export class Message<T extends MessageHeader = any> {
protected _bodyLength: number;
protected _version: MetadataVersion;
protected _compression: BodyCompression | null;
+ protected _metadata: Map<string, string>;
public get type() { return this.headerType; }
public get version() { return this._version; }
public get headerType() { return this._headerType; }
public get compression() { return this._compression; }
public get bodyLength() { return this._bodyLength; }
+ public get metadata() { return this._metadata; }
declare protected _createHeader: MessageHeaderDecoder;
public header() { return this._createHeader<T>(); }
public isSchema(): this is Message<MessageHeader.Schema> { return
this.headerType === MessageHeader.Schema; }
public isRecordBatch(): this is Message<MessageHeader.RecordBatch> {
return this.headerType === MessageHeader.RecordBatch; }
public isDictionaryBatch(): this is Message<MessageHeader.DictionaryBatch>
{ return this.headerType === MessageHeader.DictionaryBatch; }
- constructor(bodyLength: bigint | number, version: MetadataVersion,
headerType: T, header?: any) {
+ constructor(bodyLength: bigint | number, version: MetadataVersion,
headerType: T, header?: any, metadata?: Map<string, string>) {
this._version = version;
this._headerType = headerType;
this.body = new Uint8Array(0);
this._compression = header?.compression;
header && (this._createHeader = () => header);
this._bodyLength = bigIntToNumber(bodyLength);
+ this._metadata = metadata || new Map();
}
}
@@ -157,23 +174,27 @@ export class RecordBatch {
protected _buffers: BufferRegion[];
protected _compression: BodyCompression | null;
protected _variadicBufferCounts: number[];
+ protected _metadata: Map<string, string>;
public get nodes() { return this._nodes; }
public get length() { return this._length; }
public get buffers() { return this._buffers; }
public get compression() { return this._compression; }
public get variadicBufferCounts() { return this._variadicBufferCounts; }
+ public get metadata() { return this._metadata; }
constructor(
length: bigint | number,
nodes: FieldNode[],
buffers: BufferRegion[],
compression: BodyCompression | null,
- variadicBufferCounts: number[] = []
+ variadicBufferCounts: number[] = [],
+ metadata?: Map<string, string>
) {
this._nodes = nodes;
this._buffers = buffers;
this._length = bigIntToNumber(length);
this._compression = compression;
this._variadicBufferCounts = variadicBufferCounts;
+ this._metadata = metadata || new Map();
}
}
@@ -468,6 +489,17 @@ function decodeCustomMetadata(parent?: _Schema | _Field |
null) {
return data;
}
+/** @ignore */
+function decodeMessageCustomMetadata(message: _Message) {
+ const data = new Map<string, string>();
+ for (let entry, key, i = -1, n =
Math.trunc(message.customMetadataLength()); ++i < n;) {
+ if ((entry = message.customMetadata(i)) && (key = entry.key()) !=
null) {
+ data.set(key, entry.value()!);
+ }
+ }
+ return data;
+}
+
/** @ignore */
function decodeIndexType(_type: _Int) {
return new Int(_type.isSigned(), _type.bitWidth() as IntBitWidth);
diff --git a/src/ipc/reader.ts b/src/ipc/reader.ts
index af49f37..d0edafd 100644
--- a/src/ipc/reader.ts
+++ b/src/ipc/reader.ts
@@ -358,7 +358,7 @@ abstract class RecordBatchReaderImpl<T extends TypeMap =
any> implements RecordB
return this;
}
- protected _loadRecordBatch(header: metadata.RecordBatch, body:
Uint8Array): RecordBatch<T> {
+ protected _loadRecordBatch(header: metadata.RecordBatch, body: Uint8Array,
messageMetadata?: Map<string, string>): RecordBatch<T> {
let children: Data<any>[];
if (header.compression != null) {
const codec = compressionRegistry.get(header.compression.type);
@@ -379,7 +379,7 @@ abstract class RecordBatchReaderImpl<T extends TypeMap =
any> implements RecordB
}
const data = makeData({ type: new Struct(this.schema.fields), length:
header.length, children });
- return new RecordBatch(this.schema, data);
+ return new RecordBatch(this.schema, data, messageMetadata);
}
protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body:
Uint8Array) {
@@ -512,7 +512,7 @@ class RecordBatchStreamReaderImpl<T extends TypeMap = any>
extends RecordBatchRe
this._recordBatchIndex++;
const header = message.header();
const buffer = reader.readMessageBody(message.bodyLength);
- const recordBatch = this._loadRecordBatch(header, buffer);
+ const recordBatch = this._loadRecordBatch(header, buffer,
message.metadata);
return { done: false, value: recordBatch };
} else if (message.isDictionaryBatch()) {
this._dictionaryIndex++;
@@ -587,7 +587,7 @@ class AsyncRecordBatchStreamReaderImpl<T extends TypeMap =
any> extends RecordBa
this._recordBatchIndex++;
const header = message.header();
const buffer = await
reader.readMessageBody(message.bodyLength);
- const recordBatch = this._loadRecordBatch(header, buffer);
+ const recordBatch = this._loadRecordBatch(header, buffer,
message.metadata);
return { done: false, value: recordBatch };
} else if (message.isDictionaryBatch()) {
this._dictionaryIndex++;
@@ -640,7 +640,7 @@ class RecordBatchFileReaderImpl<T extends TypeMap = any>
extends RecordBatchStre
if (message?.isRecordBatch()) {
const header = message.header();
const buffer =
this._reader.readMessageBody(message.bodyLength);
- const recordBatch = this._loadRecordBatch(header, buffer);
+ const recordBatch = this._loadRecordBatch(header, buffer,
message.metadata);
return recordBatch;
}
}
@@ -714,7 +714,7 @@ class AsyncRecordBatchFileReaderImpl<T extends TypeMap =
any> extends AsyncRecor
if (message?.isRecordBatch()) {
const header = message.header();
const buffer = await
this._reader.readMessageBody(message.bodyLength);
- const recordBatch = this._loadRecordBatch(header, buffer);
+ const recordBatch = this._loadRecordBatch(header, buffer,
message.metadata);
return recordBatch;
}
}
diff --git a/src/ipc/writer.ts b/src/ipc/writer.ts
index 0b13fdf..7d783eb 100644
--- a/src/ipc/writer.ts
+++ b/src/ipc/writer.ts
@@ -185,6 +185,9 @@ export class RecordBatchWriter<T extends TypeMap = any>
extends ReadableInterop<
return this;
}
+ public write(payload?: Table<T> | RecordBatch<T> |
Iterable<RecordBatch<T>> | null): void;
+ // Overload for UnderlyingSink compatibility (used by DOM streams)
+ public write(chunk: RecordBatch<T>, controller:
WritableStreamDefaultController): void;
public write(payload?: Table<T> | RecordBatch<T> |
Iterable<RecordBatch<T>> | null) {
let schema: Schema<T> | null = null;
@@ -275,7 +278,7 @@ export class RecordBatchWriter<T extends TypeMap = any>
extends ReadableInterop<
protected _writeRecordBatch(batch: RecordBatch<T>) {
const { byteLength, nodes, bufferRegions, buffers,
variadicBufferCounts } = this._assembleRecordBatch(batch);
- const recordBatch = new metadata.RecordBatch(batch.numRows, nodes,
bufferRegions, this._compression, variadicBufferCounts);
+ const recordBatch = new metadata.RecordBatch(batch.numRows, nodes,
bufferRegions, this._compression, variadicBufferCounts, batch.metadata);
const message = Message.from(recordBatch, byteLength);
return this
._writeDictionaries(batch)
@@ -589,3 +592,4 @@ function recordBatchToJSON(records: RecordBatch) {
'columns': columns
}, null, 2);
}
+
diff --git a/src/recordbatch.ts b/src/recordbatch.ts
index cb4dbf9..7cdc8b3 100644
--- a/src/recordbatch.ts
+++ b/src/recordbatch.ts
@@ -61,9 +61,10 @@ export class RecordBatch<T extends TypeMap = any> {
declare public readonly [kRecordBatchSymbol]: true;
constructor(columns: { [P in keyof T]: Data<T[P]> });
- constructor(schema: Schema<T>, data?: Data<Struct<T>>);
+ constructor(schema: Schema<T>, data?: Data<Struct<T>>, metadata?:
Map<string, string>);
constructor(...args: any[]) {
switch (args.length) {
+ case 3:
case 2: {
[this.schema] = args;
if (!(this.schema instanceof Schema)) {
@@ -74,7 +75,8 @@ export class RecordBatch<T extends TypeMap = any> {
nullCount: 0,
type: new Struct<T>(this.schema.fields),
children: this.schema.fields.map((f) => makeData({
type: f.type, nullCount: 0 }))
- })
+ }),
+ this._metadata = new Map()
] = args;
if (!(this.data instanceof Data)) {
throw new TypeError('RecordBatch constructor expects a
[Schema, Data] pair.');
@@ -98,6 +100,7 @@ export class RecordBatch<T extends TypeMap = any> {
const schema = new Schema<T>(fields);
const data = makeData({ type: new Struct<T>(fields), length,
children, nullCount: 0 });
[this.schema, this.data] = ensureSameLengthData<T>(schema,
data.children as Data<T[keyof T]>[], length);
+ this._metadata = new Map();
break;
}
default: throw new TypeError('RecordBatch constructor expects an
Object mapping names to child Data, or a [Schema, Data] pair.');
@@ -105,10 +108,16 @@ export class RecordBatch<T extends TypeMap = any> {
}
protected _dictionaries?: Map<number, Vector>;
+ protected _metadata: Map<string, string>;
public readonly schema: Schema<T>;
public readonly data: Data<Struct<T>>;
+ /**
+ * Custom metadata for this RecordBatch.
+ */
+ public get metadata() { return this._metadata; }
+
public get dictionaries() {
return this._dictionaries || (this._dictionaries =
collectDictionaries(this.schema.fields, this.data.children));
}
@@ -202,7 +211,7 @@ export class RecordBatch<T extends TypeMap = any> {
*/
public slice(begin?: number, end?: number): RecordBatch<T> {
const [slice] = new Vector([this.data]).slice(begin, end).data;
- return new RecordBatch(this.schema, slice);
+ return new RecordBatch(this.schema, slice, this._metadata);
}
/**
@@ -254,7 +263,7 @@ export class RecordBatch<T extends TypeMap = any> {
schema = new Schema(fields, new Map(this.schema.metadata));
data = makeData({ type: new Struct<T>(fields), children });
}
- return new RecordBatch(schema, data);
+ return new RecordBatch(schema, data, this._metadata);
}
/**
@@ -273,7 +282,7 @@ export class RecordBatch<T extends TypeMap = any> {
children[index] = this.data.children[index] as Data<T[K]>;
}
}
- return new RecordBatch(schema, makeData({ type, length: this.numRows,
children }));
+ return new RecordBatch(schema, makeData({ type, length: this.numRows,
children }), this._metadata);
}
/**
@@ -286,7 +295,7 @@ export class RecordBatch<T extends TypeMap = any> {
const schema = this.schema.selectAt<K>(columnIndices);
const children = columnIndices.map((i) =>
this.data.children[i]).filter(Boolean);
const subset = makeData({ type: new Struct(schema.fields), length:
this.numRows, children });
- return new RecordBatch<{ [P in keyof K]: K[P] }>(schema, subset);
+ return new RecordBatch<{ [P in keyof K]: K[P] }>(schema, subset,
this._metadata);
}
// Initialize this static property via an IIFE so bundlers don't tree-shake
@@ -369,9 +378,9 @@ function collectDictionaries(fields: Field[], children:
readonly Data[], diction
* @private
*/
export class _InternalEmptyPlaceholderRecordBatch<T extends TypeMap = any>
extends RecordBatch<T> {
- constructor(schema: Schema<T>) {
+ constructor(schema: Schema<T>, metadata?: Map<string, string>) {
const children = schema.fields.map((f) => makeData({ type: f.type }));
const data = makeData({ type: new Struct<T>(schema.fields), nullCount:
0, children });
- super(schema, data);
+ super(schema, data, metadata || new Map());
}
}
diff --git a/test/data/test_message_metadata.arrow
b/test/data/test_message_metadata.arrow
new file mode 100644
index 0000000..2dd9e82
Binary files /dev/null and b/test/data/test_message_metadata.arrow differ
diff --git a/test/unit/ipc/reader/message-metadata-tests.ts
b/test/unit/ipc/reader/message-metadata-tests.ts
new file mode 100644
index 0000000..ecfc0a1
--- /dev/null
+++ b/test/unit/ipc/reader/message-metadata-tests.ts
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import { readFileSync } from 'node:fs';
+import path from 'node:path';
+import { tableFromIPC, RecordBatch } from 'apache-arrow';
+
+// Path to the test file with message-level metadata
+// Use process.cwd() since tests are run from project root
+const testFilePath = path.resolve(process.cwd(),
'test/data/test_message_metadata.arrow');
+
+describe('RecordBatch message metadata', () => {
+ const buffer = readFileSync(testFilePath);
+ const table = tableFromIPC(buffer);
+
+ test('should read RecordBatch metadata from IPC file', () => {
+ expect(table.batches).toHaveLength(3);
+
+ for (let i = 0; i < table.batches.length; i++) {
+ const batch = table.batches[i];
+ expect(batch).toBeInstanceOf(RecordBatch);
+ expect(batch.metadata).toBeInstanceOf(Map);
+ expect(batch.metadata.size).toBeGreaterThan(0);
+
+ // Verify specific metadata keys exist
+ expect(batch.metadata.has('batch_index')).toBe(true);
+ expect(batch.metadata.has('batch_id')).toBe(true);
+ expect(batch.metadata.has('producer')).toBe(true);
+
+ // Verify batch_index matches the batch position
+ expect(batch.metadata.get('batch_index')).toBe(String(i));
+
expect(batch.metadata.get('batch_id')).toBe(`batch_${String(i).padStart(4,
'0')}`);
+ }
+ });
+
+ test('should read unicode metadata values', () => {
+ const batch = table.batches[0];
+ expect(batch.metadata.has('unicode_test')).toBe(true);
+ expect(batch.metadata.get('unicode_test')).toBe('Hello 世界 🌍 مرحبا');
+ });
+
+ test('should handle empty metadata values', () => {
+ const batch = table.batches[0];
+ expect(batch.metadata.has('optional_field')).toBe(true);
+ expect(batch.metadata.get('optional_field')).toBe('');
+ });
+
+ test('should read JSON metadata values', () => {
+ const batch = table.batches[0];
+ expect(batch.metadata.has('batch_info_json')).toBe(true);
+ const jsonStr = batch.metadata.get('batch_info_json')!;
+ const parsed = JSON.parse(jsonStr);
+ expect(parsed.batch_number).toBe(0);
+ expect(parsed.processing_stage).toBe('final');
+ expect(parsed.tags).toEqual(['validated', 'complete']);
+ });
+
+ describe('metadata preservation', () => {
+ test('should preserve metadata through slice()', () => {
+ const batch = table.batches[0];
+ const sliced = batch.slice(0, 2);
+ expect(sliced.metadata).toBeInstanceOf(Map);
+ expect(sliced.metadata.size).toBe(batch.metadata.size);
+
expect(sliced.metadata.get('batch_index')).toBe(batch.metadata.get('batch_index'));
+ });
+
+ test('should preserve metadata through select()', () => {
+ const batch = table.batches[0];
+ const selected = batch.select(['id', 'name']);
+ expect(selected.metadata).toBeInstanceOf(Map);
+ expect(selected.metadata.size).toBe(batch.metadata.size);
+
expect(selected.metadata.get('batch_index')).toBe(batch.metadata.get('batch_index'));
+ });
+
+ test('should preserve metadata through selectAt()', () => {
+ const batch = table.batches[0];
+ const selectedAt = batch.selectAt([0, 1]);
+ expect(selectedAt.metadata).toBeInstanceOf(Map);
+ expect(selectedAt.metadata.size).toBe(batch.metadata.size);
+
expect(selectedAt.metadata.get('batch_index')).toBe(batch.metadata.get('batch_index'));
+ });
+ });
+});
diff --git a/test/unit/ipc/writer/message-metadata-tests.ts
b/test/unit/ipc/writer/message-metadata-tests.ts
new file mode 100644
index 0000000..ccc800b
--- /dev/null
+++ b/test/unit/ipc/writer/message-metadata-tests.ts
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+ Field,
+ Int32,
+ makeData,
+ RecordBatch,
+ RecordBatchFileWriter,
+ RecordBatchStreamWriter,
+ Schema,
+ Struct,
+ tableFromIPC,
+ Utf8
+} from 'apache-arrow';
+
+describe('RecordBatch message metadata writing', () => {
+
+ // Helper to create a simple RecordBatch for testing
+ function createTestBatch(): RecordBatch {
+ const schema = new Schema([
+ new Field('id', new Int32()),
+ new Field('name', new Utf8())
+ ]);
+ const idData = makeData({ type: new Int32(), data: new Int32Array([1,
2, 3]) });
+ const nameData = makeData({ type: new Utf8(), data:
Buffer.from('foobarbaz'), valueOffsets: new Int32Array([0, 3, 6, 9]) });
+ const structData = makeData({
+ type: new Struct(schema.fields),
+ length: 3,
+ nullCount: 0,
+ children: [idData, nameData]
+ });
+ return new RecordBatch(schema, structData);
+ }
+
+ describe('Stream format round-trip', () => {
+ test('should write and read metadata via RecordBatchStreamWriter', ()
=> {
+ const batch = createTestBatch();
+ batch.metadata.set('batch_id', '123');
+ batch.metadata.set('source', 'test');
+
+ const writer = new RecordBatchStreamWriter();
+ writer.write(batch);
+ writer.finish();
+ const buffer = writer.toUint8Array(true);
+
+ const table = tableFromIPC(buffer);
+ expect(table.batches).toHaveLength(1);
+ expect(table.batches[0].metadata).toBeInstanceOf(Map);
+ expect(table.batches[0].metadata.get('batch_id')).toBe('123');
+ expect(table.batches[0].metadata.get('source')).toBe('test');
+ });
+
+ test('should write batch without metadata when none provided', () => {
+ const batch = createTestBatch();
+
+ const writer = new RecordBatchStreamWriter();
+ writer.write(batch);
+ writer.finish();
+ const buffer = writer.toUint8Array(true);
+
+ const table = tableFromIPC(buffer);
+ expect(table.batches).toHaveLength(1);
+ expect(table.batches[0].metadata).toBeInstanceOf(Map);
+ expect(table.batches[0].metadata.size).toBe(0);
+ });
+ });
+
+ describe('File format round-trip', () => {
+ test('should write and read metadata via RecordBatchFileWriter', () =>
{
+ const batch = createTestBatch();
+ batch.metadata.set('format', 'file');
+ batch.metadata.set('version', '1.0');
+
+ const writer = new RecordBatchFileWriter();
+ writer.write(batch);
+ writer.finish();
+ const buffer = writer.toUint8Array(true);
+
+ const table = tableFromIPC(buffer);
+ expect(table.batches).toHaveLength(1);
+ expect(table.batches[0].metadata.get('format')).toBe('file');
+ expect(table.batches[0].metadata.get('version')).toBe('1.0');
+ });
+ });
+
+ describe('Multiple batches with different metadata', () => {
+ test('should write multiple batches with different metadata', () => {
+ const writer = new RecordBatchStreamWriter();
+
+ const batch1 = createTestBatch();
+ batch1.metadata.set('batch_index', '0');
+ batch1.metadata.set('tag', 'first');
+
+ const batch2 = createTestBatch();
+ batch2.metadata.set('batch_index', '1');
+ batch2.metadata.set('tag', 'middle');
+
+ const batch3 = createTestBatch();
+ batch3.metadata.set('batch_index', '2');
+ batch3.metadata.set('tag', 'last');
+
+ writer.write(batch1);
+ writer.write(batch2);
+ writer.write(batch3);
+ writer.finish();
+
+ const table = tableFromIPC(writer.toUint8Array(true));
+ expect(table.batches).toHaveLength(3);
+ expect(table.batches[0].metadata.get('batch_index')).toBe('0');
+ expect(table.batches[0].metadata.get('tag')).toBe('first');
+ expect(table.batches[1].metadata.get('batch_index')).toBe('1');
+ expect(table.batches[1].metadata.get('tag')).toBe('middle');
+ expect(table.batches[2].metadata.get('batch_index')).toBe('2');
+ expect(table.batches[2].metadata.get('tag')).toBe('last');
+ });
+ });
+
+ describe('Metadata preservation through operations', () => {
+ test('should preserve metadata through slice after round-trip', () => {
+ const batch = createTestBatch();
+ batch.metadata.set('key', 'value');
+
+ const writer = new RecordBatchStreamWriter();
+ writer.write(batch);
+ writer.finish();
+
+ const table = tableFromIPC(writer.toUint8Array(true));
+ const sliced = table.batches[0].slice(0, 2);
+
+ expect(sliced.metadata.get('key')).toBe('value');
+ });
+
+ test('should preserve metadata through selectAt after round-trip', ()
=> {
+ const batch = createTestBatch();
+ batch.metadata.set('preserved', 'yes');
+
+ const writer = new RecordBatchStreamWriter();
+ writer.write(batch);
+ writer.finish();
+
+ const table = tableFromIPC(writer.toUint8Array(true));
+ const selected = table.batches[0].selectAt([0]);
+
+ expect(selected.metadata.get('preserved')).toBe('yes');
+ });
+ });
+
+ describe('Metadata from constructor', () => {
+ test('should use metadata passed to RecordBatch constructor', () => {
+ const schema = new Schema([new Field('id', new Int32())]);
+ const idData = makeData({ type: new Int32(), data: new
Int32Array([1, 2, 3]) });
+ const structData = makeData({
+ type: new Struct(schema.fields),
+ length: 3,
+ nullCount: 0,
+ children: [idData]
+ });
+ const batch = new RecordBatch(schema, structData, new
Map([['from_batch', 'value']]));
+
+ const writer = new RecordBatchStreamWriter();
+ writer.write(batch);
+ writer.finish();
+
+ const table = tableFromIPC(writer.toUint8Array(true));
+ expect(table.batches[0].metadata.get('from_batch')).toBe('value');
+ });
+ });
+
+ describe('Edge cases', () => {
+ test('should handle empty metadata map', () => {
+ const batch = createTestBatch();
+
+ const writer = new RecordBatchStreamWriter();
+ writer.write(batch);
+ writer.finish();
+
+ const table = tableFromIPC(writer.toUint8Array(true));
+ expect(table.batches[0].metadata.size).toBe(0);
+ });
+
+ test('should handle unicode keys and values', () => {
+ const batch = createTestBatch();
+ batch.metadata.set('日本語キー', 'Japanese key');
+ batch.metadata.set('emoji', '🎉🚀💻');
+ batch.metadata.set('mixed', 'Hello 世界 مرحبا');
+
+ const writer = new RecordBatchStreamWriter();
+ writer.write(batch);
+ writer.finish();
+
+ const table = tableFromIPC(writer.toUint8Array(true));
+ expect(table.batches[0].metadata.get('日本語キー')).toBe('Japanese
key');
+ expect(table.batches[0].metadata.get('emoji')).toBe('🎉🚀💻');
+ expect(table.batches[0].metadata.get('mixed')).toBe('Hello 世界
مرحبا');
+ });
+
+ test('should handle empty string values', () => {
+ const batch = createTestBatch();
+ batch.metadata.set('empty_value', '');
+ batch.metadata.set('normal', 'value');
+
+ const writer = new RecordBatchStreamWriter();
+ writer.write(batch);
+ writer.finish();
+
+ const table = tableFromIPC(writer.toUint8Array(true));
+ expect(table.batches[0].metadata.get('empty_value')).toBe('');
+ expect(table.batches[0].metadata.get('normal')).toBe('value');
+ });
+
+ test('should handle JSON string as metadata value', () => {
+ const batch = createTestBatch();
+ const jsonValue = JSON.stringify({ nested: { data: [1, 2, 3] },
flag: true });
+ batch.metadata.set('json_data', jsonValue);
+
+ const writer = new RecordBatchStreamWriter();
+ writer.write(batch);
+ writer.finish();
+
+ const table = tableFromIPC(writer.toUint8Array(true));
+ const retrieved = table.batches[0].metadata.get('json_data')!;
+ const parsed = JSON.parse(retrieved);
+ expect(parsed.nested.data).toEqual([1, 2, 3]);
+ expect(parsed.flag).toBe(true);
+ });
+
+ test('should handle long metadata values', () => {
+ const batch = createTestBatch();
+ const longValue = 'x'.repeat(10000);
+ batch.metadata.set('long_value', longValue);
+
+ const writer = new RecordBatchStreamWriter();
+ writer.write(batch);
+ writer.finish();
+
+ const table = tableFromIPC(writer.toUint8Array(true));
+
expect(table.batches[0].metadata.get('long_value')).toBe(longValue);
+ });
+ });
+});