This is an automated email from the ASF dual-hosted git repository.
kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 92734673d1 GH-49620: [Ruby] Add support for custom metadata in Message
(#49621)
92734673d1 is described below
commit 92734673d1c9dea033fb30844278d321e82a7874
Author: Sutou Kouhei <[email protected]>
AuthorDate: Wed Apr 1 12:01:36 2026 +0900
GH-49620: [Ruby] Add support for custom metadata in Message (#49621)
### Rationale for this change
`Message` can attach custom metadata:
https://github.com/apache/arrow/blob/61ef6722b01a289ca6e09d3c5aaef782f060f94c/format/Message.fbs#L152-L157
### What changes are included in this PR?
* Add `ArrowFormat::Dictionary` for dictionary
* Add support for reading custom metadata in `Message` and attaching them
to `ArrowFormat::Schema`, `ArrowFormat::RecordBatch` and
`ArrowFormat::Dictionary`
* Add support for writing custom metadata in `ArrowFormat::Schema`,
`ArrowFormat::RecordBatch` and `ArrowFormat::Dictionary` as `Message`'s custom
metadata
### Are these changes tested?
Yes.
### Are there any user-facing changes?
Yes.
* GitHub Issue: #49620
Authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: Sutou Kouhei <[email protected]>
---
ruby/red-arrow-format/lib/arrow-format/array.rb | 2 +-
.../lib/arrow-format/{schema.rb => dictionary.rb} | 31 +++------
ruby/red-arrow-format/lib/arrow-format/field.rb | 9 +--
.../lib/arrow-format/file-reader.rb | 16 ++++-
.../lib/arrow-format/file-writer.rb | 9 +--
.../lib/arrow-format/flatbuffers.rb | 13 ++++
ruby/red-arrow-format/lib/arrow-format/readable.rb | 16 +++--
.../lib/arrow-format/record-batch.rb | 4 +-
ruby/red-arrow-format/lib/arrow-format/schema.rb | 13 ++--
.../lib/arrow-format/streaming-pull-reader.rb | 39 +++++++----
.../lib/arrow-format/streaming-writer.rb | 22 +++---
ruby/red-arrow-format/test/test-writer.rb | 81 ++++++++++++++++++++--
12 files changed, 170 insertions(+), 85 deletions(-)
diff --git a/ruby/red-arrow-format/lib/arrow-format/array.rb
b/ruby/red-arrow-format/lib/arrow-format/array.rb
index 10be36d530..cb71a4d255 100644
--- a/ruby/red-arrow-format/lib/arrow-format/array.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/array.rb
@@ -738,7 +738,7 @@ module ArrowFormat
values = []
@dictionaries.each do |dictionary|
- values.concat(dictionary.to_a)
+ values.concat(dictionary.array.to_a)
end
indices.collect do |index|
if index.nil?
diff --git a/ruby/red-arrow-format/lib/arrow-format/schema.rb
b/ruby/red-arrow-format/lib/arrow-format/dictionary.rb
similarity index 54%
copy from ruby/red-arrow-format/lib/arrow-format/schema.rb
copy to ruby/red-arrow-format/lib/arrow-format/dictionary.rb
index ef2a1b9be3..3e8b64d9d2 100644
--- a/ruby/red-arrow-format/lib/arrow-format/schema.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/dictionary.rb
@@ -1,3 +1,4 @@
+# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
@@ -15,28 +16,14 @@
# under the License.
module ArrowFormat
- class Schema
- attr_reader :fields
- attr_reader :metadata
- def initialize(fields, metadata: nil)
- @fields = fields
- @metadata = metadata
- end
-
- def to_flatbuffers
- fb_schema = FB::Schema::Data.new
- fb_schema.endianness = FB::Endianness::LITTLE
- fb_schema.fields = fields.collect(&:to_flatbuffers)
- if @metadata
- fb_schema.custom_metadata = @metadata.collect do |key, value|
- fb_key_value = FB::KeyValue::Data.new
- fb_key_value.key = key
- fb_key_value.value = value
- fb_key_value
- end
- end
- # fb_schema.features = @features
- fb_schema
+ class Dictionary
+ attr_reader :id
+ attr_reader :array
+ attr_reader :message_metadata
+ def initialize(id, array, message_metadata: nil)
+ @id = id
+ @array = array
+ @message_metadata = message_metadata
end
end
end
diff --git a/ruby/red-arrow-format/lib/arrow-format/field.rb
b/ruby/red-arrow-format/lib/arrow-format/field.rb
index 28a4a90b09..b7e3ce4b5a 100644
--- a/ruby/red-arrow-format/lib/arrow-format/field.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/field.rb
@@ -47,14 +47,7 @@ module ArrowFormat
elsif @type.respond_to?(:children)
fb_field.children = @type.children.collect(&:to_flatbuffers)
end
- if @metadata
- fb_field.custom_metadata = @metadata.collect do |key, value|
- fb_key_value = FB::KeyValue::Data.new
- fb_key_value.key = key
- fb_key_value.value = value
- fb_key_value
- end
- end
+ fb_field.custom_metadata = FB.build_custom_metadata(@metadata)
fb_field
end
end
diff --git a/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
b/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
index cec3711096..9aa258f242 100644
--- a/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/file-reader.rb
@@ -66,7 +66,11 @@ module ArrowFormat
"Not a record batch message: #{i}: " +
fb_header.class.name)
end
- read_record_batch(fb_message.version, fb_header, @schema, body)
+ read_record_batch(fb_message.version,
+ fb_header,
+ fb_message.custom_metadata,
+ @schema,
+ body)
end
def each
@@ -116,6 +120,7 @@ module ArrowFormat
# message_pull_reader = MessagePullReader.new do |message, body|
# return read_record_batch(message.version,
# message.header,
+ # message.custom_metadata,
# @schema,
# body)
# end
@@ -207,12 +212,17 @@ module ArrowFormat
schema = Schema.new([Field.new("dummy", value_type)])
record_batch = read_record_batch(fb_message.version,
fb_header.data,
+ nil,
schema,
body)
+ message_metadata = read_custom_metadata(fb_message.custom_metadata)
+ dictionary = Dictionary.new(fb_header.id,
+ record_batch.columns[0],
+ message_metadata: message_metadata)
if fb_header.delta?
- dictionaries[id] << record_batch.columns[0]
+ dictionaries[id] << dictionary
else
- dictionaries[id] = [record_batch.columns[0]]
+ dictionaries[id] = [dictionary]
end
end
dictionaries
diff --git a/ruby/red-arrow-format/lib/arrow-format/file-writer.rb
b/ruby/red-arrow-format/lib/arrow-format/file-writer.rb
index 2ac4695180..b1f3f2fab4 100644
--- a/ruby/red-arrow-format/lib/arrow-format/file-writer.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/file-writer.rb
@@ -43,14 +43,7 @@ module ArrowFormat
fb_footer.schema = @fb_schema
fb_footer.dictionaries = @fb_dictionary_blocks
fb_footer.record_batches = @fb_record_batch_blocks
- if metadata
- fb_footer.custom_metadata = metadata.collect do |key, value|
- fb_key_value = FB::KeyValue::Data.new
- fb_key_value.key = key
- fb_key_value.value = value
- fb_key_value
- end
- end
+ fb_footer.custom_metadata = FB.build_custom_metadata(metadata)
FB::Footer.serialize(fb_footer)
end
diff --git a/ruby/red-arrow-format/lib/arrow-format/flatbuffers.rb
b/ruby/red-arrow-format/lib/arrow-format/flatbuffers.rb
index 913ae9b536..e56e848adb 100644
--- a/ruby/red-arrow-format/lib/arrow-format/flatbuffers.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/flatbuffers.rb
@@ -51,4 +51,17 @@ require_relative "org/apache/arrow/flatbuf/utf8"
module ArrowFormat
FB = Org::Apache::Arrow::Flatbuf
+
+ class << FB
+ def build_custom_metadata(custom_metadata)
+ return nil if custom_metadata.nil?
+
+ custom_metadata.collect do |key, value|
+ fb_key_value = FB::KeyValue::Data.new
+ fb_key_value.key = key
+ fb_key_value.value = value
+ fb_key_value
+ end
+ end
+ end
end
diff --git a/ruby/red-arrow-format/lib/arrow-format/readable.rb
b/ruby/red-arrow-format/lib/arrow-format/readable.rb
index 783b494b68..acce023dc8 100644
--- a/ruby/red-arrow-format/lib/arrow-format/readable.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/readable.rb
@@ -34,12 +34,14 @@ module ArrowFormat
metadata
end
- def read_schema(fb_schema)
+ def read_schema(fb_schema, fb_message_custom_metadata=nil)
fields = fb_schema.fields.collect do |fb_field|
read_field(fb_field)
end
+ message_metadata = read_custom_metadata(fb_message_custom_metadata)
Schema.new(fields,
- metadata: read_custom_metadata(fb_schema.custom_metadata))
+ metadata: read_custom_metadata(fb_schema.custom_metadata),
+ message_metadata: message_metadata)
end
def read_field(fb_field,
@@ -226,14 +228,20 @@ module ArrowFormat
end
end
- def read_record_batch(version, fb_record_batch, schema, body)
+ def read_record_batch(version,
+ fb_record_batch,
+ fb_message_custom_metadata,
+ schema,
+ body)
+ message_metadata = read_custom_metadata(fb_message_custom_metadata)
n_rows = fb_record_batch.length
nodes = fb_record_batch.nodes
buffers = fb_record_batch.buffers
columns = schema.fields.collect do |field|
read_column(version, field, nodes, buffers, body)
end
- RecordBatch.new(schema, n_rows, columns)
+ RecordBatch.new(schema, n_rows, columns,
+ message_metadata: message_metadata)
end
def read_column(version, field, nodes, buffers, body)
diff --git a/ruby/red-arrow-format/lib/arrow-format/record-batch.rb
b/ruby/red-arrow-format/lib/arrow-format/record-batch.rb
index 938f02d762..23de1b2058 100644
--- a/ruby/red-arrow-format/lib/arrow-format/record-batch.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/record-batch.rb
@@ -25,10 +25,12 @@ module ArrowFormat
alias_method :size, :n_rows
alias_method :length, :n_rows
attr_reader :columns
- def initialize(schema, n_rows, columns)
+ attr_reader :message_metadata
+ def initialize(schema, n_rows, columns, message_metadata: nil)
@schema = schema
@n_rows = n_rows
@columns = columns
+ @message_metadata = message_metadata
end
def empty?
diff --git a/ruby/red-arrow-format/lib/arrow-format/schema.rb
b/ruby/red-arrow-format/lib/arrow-format/schema.rb
index ef2a1b9be3..eb054ad546 100644
--- a/ruby/red-arrow-format/lib/arrow-format/schema.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/schema.rb
@@ -18,23 +18,18 @@ module ArrowFormat
class Schema
attr_reader :fields
attr_reader :metadata
- def initialize(fields, metadata: nil)
+ attr_reader :message_metadata
+ def initialize(fields, metadata: nil, message_metadata: nil)
@fields = fields
@metadata = metadata
+ @message_metadata = message_metadata
end
def to_flatbuffers
fb_schema = FB::Schema::Data.new
fb_schema.endianness = FB::Endianness::LITTLE
fb_schema.fields = fields.collect(&:to_flatbuffers)
- if @metadata
- fb_schema.custom_metadata = @metadata.collect do |key, value|
- fb_key_value = FB::KeyValue::Data.new
- fb_key_value.key = key
- fb_key_value.value = value
- fb_key_value
- end
- end
+ fb_schema.custom_metadata = FB.build_custom_metadata(@metadata)
# fb_schema.features = @features
fb_schema
end
diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb
b/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb
index 13e7ad7243..6ec3a24ccd 100644
--- a/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/streaming-pull-reader.rb
@@ -16,6 +16,7 @@
# under the License.
require_relative "array"
+require_relative "dictionary"
require_relative "error"
require_relative "field"
require_relative "readable"
@@ -211,7 +212,7 @@ module ArrowFormat
header.inspect)
end
- @schema = read_schema(header)
+ @schema = read_schema(header, message.custom_metadata)
@dictionaries = {}
@dictionary_fields = {}
@schema.fields.each do |field|
@@ -225,24 +226,29 @@ module ArrowFormat
end
end
- def process_dictionary_batch_message(message, body)
- header = message.header
- if @state == :initial_dictionaries and header.delta?
+ def process_dictionary_batch_message(fb_message, body)
+ fb_header = fb_message.header
+ if @state == :initial_dictionaries and fb_header.delta?
raise ReadError.new("An initial dictionary batch message must be " +
"a non delta dictionary batch message: " +
- header.inspect)
+ fb_header.inspect)
end
- field = @dictionary_fields[header.id]
+ field = @dictionary_fields[fb_header.id]
value_type = field.type.value_type
schema = Schema.new([Field.new("dummy", value_type)])
- record_batch = read_record_batch(message.version,
- header.data,
+ record_batch = read_record_batch(fb_message.version,
+ fb_header.data,
+ nil,
schema,
body)
- if header.delta?
- @dictionaries[header.id] << record_batch.columns[0]
+ message_metadata = read_custom_metadata(fb_message.custom_metadata)
+ dictionary = Dictionary.new(fb_header.id,
+ record_batch.columns[0],
+ message_metadata: message_metadata)
+ if fb_header.delta?
+ @dictionaries[fb_header.id] << dictionary
else
- @dictionaries[header.id] = [record_batch.columns[0]]
+ @dictionaries[fb_header.id] = [dictionary]
end
end
@@ -250,9 +256,14 @@ module ArrowFormat
@dictionaries[id]
end
- def process_record_batch_message(message, body)
- header = message.header
- @on_read.call(read_record_batch(message.version, header, @schema, body))
+ def process_record_batch_message(fb_message, body)
+ fb_header = fb_message.header
+ record_batch = read_record_batch(fb_message.version,
+ fb_header,
+ fb_message.custom_metadata,
+ @schema,
+ body)
+ @on_read.call(record_batch)
end
end
end
diff --git a/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb
b/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb
index 18eb2dda3a..8399663b72 100644
--- a/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb
+++ b/ruby/red-arrow-format/lib/arrow-format/streaming-writer.rb
@@ -35,7 +35,8 @@ module ArrowFormat
end
def start(schema)
- write_message(build_metadata(schema.to_flatbuffers))
+ write_message(build_metadata(schema.to_flatbuffers,
+ custom_metadata: schema.message_metadata))
end
def write_record_batch(record_batch)
@@ -75,11 +76,12 @@ module ArrowFormat
write_data(padding(padding_size)) if padding_size > 0
end
- def build_metadata(header, body_length=0)
+ def build_metadata(header, body_length=0, custom_metadata: nil)
fb_message = FB::Message::Data.new
fb_message.version = FB::MetadataVersion::V5
fb_message.header = header
fb_message.body_length = body_length
+ fb_message.custom_metadata = FB.build_custom_metadata(custom_metadata)
metadata = FB::Message.serialize(fb_message)
metadata_size = metadata.bytesize
padding_size = compute_padding_size(metadata_size, ALIGNMENT_SIZE)
@@ -93,7 +95,8 @@ module ArrowFormat
record_batch.all_buffers_enumerator.each do |buffer|
body_length += aligned_buffer_size(buffer) if buffer
end
- metadata = build_metadata(fb_header, body_length)
+ metadata = build_metadata(fb_header, body_length,
+ custom_metadata: record_batch.message_metadata)
fb_block = FB::Block::Data.new
fb_block.offset = @offset
fb_block.meta_data_length =
@@ -113,21 +116,24 @@ module ArrowFormat
value_type = dictionary_array.type.value_type
base_offset = 0
dictionary_array.dictionaries.each do |dictionary|
+ data = dictionary.array
written_offset = @written_dictionary_offsets[id] || 0
current_base_offset = base_offset
- next_base_offset = base_offset + dictionary.size
+ next_base_offset = base_offset + data.size
base_offset = next_base_offset
next if next_base_offset <= written_offset
is_delta = (not written_offset.zero?)
if current_base_offset < written_offset
- dictionary = dictionary.slice(written_offset - current_base_offset)
+ data = data.slice(written_offset - current_base_offset)
end
schema = Schema.new([Field.new("dummy", value_type)])
- size = dictionary.size
- record_batch = RecordBatch.new(schema, size, [dictionary])
+ size = data.size
+ record_batch =
+ RecordBatch.new(schema, size, [data],
+ message_metadata: dictionary.message_metadata)
fb_dictionary_batch = FB::DictionaryBatch::Data.new
fb_dictionary_batch.id = id
fb_dictionary_batch.data = record_batch.to_flatbuffers
@@ -135,7 +141,7 @@ module ArrowFormat
write_record_batch_based_message(record_batch,
fb_dictionary_batch,
@fb_dictionary_blocks)
- @written_dictionary_offsets[id] = written_offset + dictionary.size
+ @written_dictionary_offsets[id] = written_offset + data.size
end
end
diff --git a/ruby/red-arrow-format/test/test-writer.rb
b/ruby/red-arrow-format/test/test-writer.rb
index 55b3c22b7a..9fc1443ccb 100644
--- a/ruby/red-arrow-format/test/test-writer.rb
+++ b/ruby/red-arrow-format/test/test-writer.rb
@@ -193,7 +193,8 @@ module WriterHelper
when ArrowFormat::DictionaryType
validity_buffer = convert_buffer(red_arrow_array.null_bitmap)
indices_buffer = convert_buffer(red_arrow_array.indices.data_buffer)
- dictionary = convert_array(red_arrow_array.dictionary)
+ dictionary_array = convert_array(red_arrow_array.dictionary)
+ dictionary = ArrowFormat::Dictionary.new(0, dictionary_array)
type.build_array(red_arrow_array.size,
validity_buffer,
indices_buffer,
@@ -287,6 +288,27 @@ module WriterTests
table.schema.metadata)
end
+ def test_custom_metadata_message_record_batch
+ field = ArrowFormat::Field.new("value", ArrowFormat::BooleanType.new)
+ schema = ArrowFormat::Schema.new([field])
+ column = convert_array(Arrow::BooleanArray.new([true, nil, false]))
+ record_batch = ArrowFormat::RecordBatch.new(schema, 3, [column],
+ message_metadata: {
+ "key1" => "value1",
+ "key2" => "value2",
+ })
+ output = StringIO.new(+"".b)
+ writer = writer_class.new(output)
+ write(writer, record_batch)
+ writer.finish
+ reader = reader_class.new(output.string)
+ assert_equal({
+ "key1" => "value1",
+ "key2" => "value2",
+ },
+ reader.first.message_metadata)
+ end
+
def test_null
array = Arrow::NullArray.new(3)
type, values = roundtrip(array)
@@ -944,6 +966,30 @@ module FileWriterTests
end
end
+module StreamingWriterTests
+ def test_custom_metadata_message_schema
+ field = ArrowFormat::Field.new("value", ArrowFormat::BooleanType.new)
+ schema = ArrowFormat::Schema.new([field],
+ message_metadata: {
+ "key1" => "value1",
+ "key2" => "value2",
+ })
+ column = convert_array(Arrow::BooleanArray.new([true, nil, false]))
+ record_batch = ArrowFormat::RecordBatch.new(schema, 3, [column])
+ output = StringIO.new(+"".b)
+ writer = writer_class.new(output)
+ write(writer, record_batch)
+ writer.finish
+ input = StringIO.new(output.string)
+ reader = reader_class.new(input)
+ assert_equal({
+ "key1" => "value1",
+ "key2" => "value2",
+ },
+ reader.schema.message_metadata)
+ end
+end
+
module WriterDictionaryDeltaTests
def build_schema(value_type)
index_type = ArrowFormat::Int32Type.singleton
@@ -971,11 +1017,19 @@ module WriterDictionaryDeltaTests
schema = build_schema(value_type)
type = schema.fields[0].type
+ dictionary_id = 1
+
# The first record batch with new dictionary.
raw_dictionary = values1.uniq
red_arrow_dictionary =
red_arrow_value_type.build_array(raw_dictionary)
- dictionary = convert_array(red_arrow_dictionary)
+ dictionary_array = convert_array(red_arrow_dictionary)
+ dictionary =
+ ArrowFormat::Dictionary.new(dictionary_id, dictionary_array,
+ message_metadata: {
+ "key1" => "value1",
+ "key2" => "value2",
+ })
indices1 = values1.collect do |value|
raw_dictionary.index(value)
end
@@ -990,7 +1044,9 @@ module WriterDictionaryDeltaTests
raw_dictionary_more = raw_dictionary + raw_dictionary_delta
red_arrow_dictionary_delta =
red_arrow_value_type.build_array(raw_dictionary_delta)
- dictionary_delta = convert_array(red_arrow_dictionary_delta)
+ dictionary_array_delta = convert_array(red_arrow_dictionary_delta)
+ dictionary_delta =
+ ArrowFormat::Dictionary.new(dictionary_id, dictionary_array_delta)
indices2 = values2.collect do |value|
raw_dictionary_more.index(value)
end
@@ -1002,7 +1058,9 @@ module WriterDictionaryDeltaTests
raw_dictionary_more = raw_dictionary | values2.uniq
red_arrow_dictionary_more =
red_arrow_value_type.build_array(raw_dictionary_more)
- dictionary_more = convert_array(red_arrow_dictionary_more)
+ dictionary_array_more = convert_array(red_arrow_dictionary_more)
+ dictionary_more = ArrowFormat::Dictionary.new(dictionary_id,
+ dictionary_array_more)
indices2 = values2.collect do |value|
raw_dictionary_more.index(value)
end
@@ -1518,10 +1576,14 @@ class TestFileWriter < Test::Unit::TestCase
ArrowFormat::FileWriter
end
+ def reader_class
+ ArrowFormat::FileReader
+ end
+
def read(path)
File.open(path, "rb") do |input|
- reader = ArrowFormat::FileReader.new(input)
- reader.to_a.collect do |record_batch|
+ reader = reader_class.new(input)
+ reader.collect do |record_batch|
record_batch.to_h.tap do |hash|
hash.each do |key, value|
hash[key] = value.to_a
@@ -1564,9 +1626,13 @@ class TestStreamingWriter < Test::Unit::TestCase
ArrowFormat::StreamingWriter
end
+ def reader_class
+ ArrowFormat::StreamingReader
+ end
+
def read(path)
File.open(path, "rb") do |input|
- reader = ArrowFormat::StreamingReader.new(input)
+ reader = reader_class.new(input)
reader.collect do |record_batch|
record_batch.to_h.tap do |hash|
hash.each do |key, value|
@@ -1579,6 +1645,7 @@ class TestStreamingWriter < Test::Unit::TestCase
sub_test_case("Basic") do
include WriterTests
+ include StreamingWriterTests
end
sub_test_case("Dictionary: delta") do