This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a2671e9c1a [python] Rename DataBlobWriter to DedicatedFormatWriter and
support blob+vector splitting (#8027)
a2671e9c1a is described below
commit a2671e9c1ad7abb33ca51541060af24d0a9ea36e
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri May 29 11:42:21 2026 +0800
[python] Rename DataBlobWriter to DedicatedFormatWriter and support
blob+vector splitting (#8027)
DataBlobWriter now handles normal + blob + vector columns (matching
Java's DedicatedFormatRollingFileWriter), so rename it to
DedicatedFormatWriter. Also add data evolution format tests covering
parquet, blob, and vector paths.
---
.../pypaimon/read/reader/format_vortex_reader.py | 28 +-
paimon-python/pypaimon/tests/blob_table_test.py | 104 +-
.../pypaimon/tests/data_evolution_formats_test.py | 1063 ++++++++++++++++++++
paimon-python/pypaimon/write/file_store_write.py | 4 +-
...a_blob_writer.py => dedicated_format_writer.py} | 169 ++--
5 files changed, 1224 insertions(+), 144 deletions(-)
diff --git a/paimon-python/pypaimon/read/reader/format_vortex_reader.py
b/paimon-python/pypaimon/read/reader/format_vortex_reader.py
index ac48ca1abe..fcd50a24f0 100644
--- a/paimon-python/pypaimon/read/reader/format_vortex_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_vortex_reader.py
@@ -86,29 +86,23 @@ class FormatVortexReader(RecordBatchReader):
PyarrowFieldParser.from_paimon_schema(read_fields) if read_fields
else None
)
- # Collect predicate-referenced fields for targeted view type casting
- self._cast_fields = predicate_fields if predicate_fields and
vortex_expr is not None else set()
-
@staticmethod
- def _cast_view_types(batch: RecordBatch, target_fields: Set[str]) ->
RecordBatch:
- """Cast string_view/binary_view columns to string/binary, only for
target fields."""
- if not target_fields:
- return batch
+ def _cast_view_types(batch: RecordBatch) -> RecordBatch:
+ """Cast all string_view/binary_view columns to string/binary."""
columns = []
fields = []
changed = False
for i in range(batch.num_columns):
col = batch.column(i)
field = batch.schema.field(i)
- if field.name in target_fields:
- if col.type == pa.string_view():
- col = col.cast(pa.utf8())
- field = field.with_type(pa.utf8())
- changed = True
- elif col.type == pa.binary_view():
- col = col.cast(pa.binary())
- field = field.with_type(pa.binary())
- changed = True
+ if col.type == pa.string_view():
+ col = col.cast(pa.utf8())
+ field = field.with_type(pa.utf8())
+ changed = True
+ elif col.type == pa.binary_view():
+ col = col.cast(pa.binary())
+ field = field.with_type(pa.binary())
+ changed = True
columns.append(col)
fields.append(field)
if changed:
@@ -118,7 +112,7 @@ class FormatVortexReader(RecordBatchReader):
def read_arrow_batch(self) -> Optional[RecordBatch]:
try:
batch = next(self.record_batch_reader)
- batch = self._cast_view_types(batch, self._cast_fields)
+ batch = self._cast_view_types(batch)
if not self.missing_fields:
return batch
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index 7261503450..042c47699a 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -29,8 +29,8 @@ from pypaimon.table.file_store_table import FileStoreTable
from pypaimon.write.commit_message import CommitMessage
-class DataBlobWriterTest(unittest.TestCase):
- """Tests for DataBlobWriter functionality with paimon table operations."""
+class DedicatedFormatWriterTest(unittest.TestCase):
+ """Tests for DedicatedFormatWriter functionality with paimon table
operations."""
@classmethod
def setUpClass(cls):
@@ -51,8 +51,8 @@ class DataBlobWriterTest(unittest.TestCase):
except OSError:
pass
- def test_data_blob_writer_basic_functionality(self):
- """Test basic DataBlobWriter functionality with paimon table."""
+ def test_dedicated_format_writer_basic_functionality(self):
+ """Test basic DedicatedFormatWriter functionality with paimon table."""
from pypaimon import Schema
# Create schema with normal and blob columns
@@ -82,7 +82,7 @@ class DataBlobWriterTest(unittest.TestCase):
'blob_data': [b'blob_data_1', b'blob_data_2', b'blob_data_3']
}, schema=pa_schema)
- # Test DataBlobWriter initialization using proper table API
+ # Test DedicatedFormatWriter initialization using proper table API
# Use proper table API to create writer
write_builder = table.new_batch_write_builder()
blob_writer = write_builder.new_write()
@@ -108,8 +108,8 @@ class DataBlobWriterTest(unittest.TestCase):
blob_writer.close()
- def test_data_blob_writer_schema_detection(self):
- """Test that DataBlobWriter correctly detects blob columns from
schema."""
+ def test_dedicated_format_writer_schema_detection(self):
+ """Test that DedicatedFormatWriter correctly detects blob columns from
schema."""
from pypaimon import Schema
# Test schema with blob column
@@ -132,7 +132,7 @@ class DataBlobWriterTest(unittest.TestCase):
write_builder = table.new_batch_write_builder()
blob_writer = write_builder.new_write()
- # Test that DataBlobWriter was created internally
+ # Test that DedicatedFormatWriter was created internally
# We can verify this by checking the internal data writers
test_data = pa.Table.from_pydict({
'id': [1, 2, 3],
@@ -142,19 +142,19 @@ class DataBlobWriterTest(unittest.TestCase):
# Write data to trigger writer creation
blob_writer.write_arrow(test_data)
- # Verify that a DataBlobWriter was created internally
+ # Verify that a DedicatedFormatWriter was created internally
data_writers = blob_writer.file_store_write.data_writers
self.assertGreater(len(data_writers), 0)
- # Check that the writer is a DataBlobWriter
+ # Check that the writer is a DedicatedFormatWriter
for writer in data_writers.values():
- from pypaimon.write.writer.data_blob_writer import DataBlobWriter
- self.assertIsInstance(writer, DataBlobWriter)
+ from pypaimon.write.writer.dedicated_format_writer import
DedicatedFormatWriter
+ self.assertIsInstance(writer, DedicatedFormatWriter)
blob_writer.close()
- def test_data_blob_writer_no_blob_column(self):
- """Test that DataBlobWriter raises error when no blob column is
found."""
+ def test_dedicated_format_writer_no_blob_column(self):
+ """Test that DedicatedFormatWriter raises error when no blob column is
found."""
from pypaimon import Schema
# Test schema without blob column
@@ -177,7 +177,7 @@ class DataBlobWriterTest(unittest.TestCase):
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
- # Test that a regular writer (not DataBlobWriter) was created
+ # Test that a regular writer (not DedicatedFormatWriter) was created
test_data = pa.Table.from_pydict({
'id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie']
@@ -186,19 +186,19 @@ class DataBlobWriterTest(unittest.TestCase):
# Write data to trigger writer creation
writer.write_arrow(test_data)
- # Verify that a regular writer was created (not DataBlobWriter)
+ # Verify that a regular writer was created (not DedicatedFormatWriter)
data_writers = writer.file_store_write.data_writers
self.assertGreater(len(data_writers), 0)
- # Check that the writer is NOT a DataBlobWriter
+ # Check that the writer is NOT a DedicatedFormatWriter
for writer_instance in data_writers.values():
- from pypaimon.write.writer.data_blob_writer import DataBlobWriter
- self.assertNotIsInstance(writer_instance, DataBlobWriter)
+ from pypaimon.write.writer.dedicated_format_writer import
DedicatedFormatWriter
+ self.assertNotIsInstance(writer_instance, DedicatedFormatWriter)
writer.close()
- def test_data_blob_writer_multiple_blob_columns(self):
- """Test that DataBlobWriter supports multiple blob columns."""
+ def test_dedicated_format_writer_multiple_blob_columns(self):
+ """Test that DedicatedFormatWriter supports multiple blob columns."""
from pypaimon import Schema
# Test schema with multiple blob columns
@@ -242,7 +242,7 @@ class DataBlobWriterTest(unittest.TestCase):
result =
table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
self.assertEqual(result.num_rows, 3)
- def test_data_blob_writer_partial_write_with_write_type(self):
+ def test_dedicated_format_writer_partial_write_with_write_type(self):
"""Partial write (normal + blob subset) via with_write_type: split
must match batch columns."""
from pypaimon import Schema
@@ -292,7 +292,7 @@ class DataBlobWriterTest(unittest.TestCase):
self.assertEqual(out.column('blob_data').to_pylist(), [b'a', b'b'])
self.assertEqual(out.column('name').to_pylist(), [None, None])
- def test_data_blob_writer_partial_write_normal_only_with_write_type(self):
+ def
test_dedicated_format_writer_partial_write_normal_only_with_write_type(self):
"""Partial write without blob columns in write_cols must not touch
blob split paths."""
from pypaimon import Schema
@@ -333,7 +333,7 @@ class DataBlobWriterTest(unittest.TestCase):
self.assertEqual(out.column('name').to_pylist(), ['n'])
self.assertEqual(out.column('blob_data').to_pylist(), [None])
- def
test_data_blob_writer_partial_write_single_blob_of_two_with_write_type(self):
+ def
test_dedicated_format_writer_partial_write_single_blob_of_two_with_write_type(self):
"""with_write_type lists only one blob column: only that column gets
.blob files."""
from pypaimon import Schema
@@ -369,8 +369,8 @@ class DataBlobWriterTest(unittest.TestCase):
write_builder.new_commit().commit(commit_messages)
writer.close()
- def test_data_blob_writer_write_operations(self):
- """Test DataBlobWriter write operations with real data."""
+ def test_dedicated_format_writer_write_operations(self):
+ """Test DedicatedFormatWriter write operations with real data."""
from pypaimon import Schema
# Create schema with blob column
@@ -411,8 +411,8 @@ class DataBlobWriterTest(unittest.TestCase):
blob_writer.close()
- def test_data_blob_writer_write_large_blob(self):
- """Test DataBlobWriter with large blob data (5MB per item) in 10
batches."""
+ def test_dedicated_format_writer_write_large_blob(self):
+ """Test DedicatedFormatWriter with very large blob data (50MB per
item) in 10 batches."""
from pypaimon import Schema
# Create schema with blob column
@@ -467,7 +467,7 @@ class DataBlobWriterTest(unittest.TestCase):
# Log progress for large data processing
print(f"Completed batch {batch_num + 1}/10 with {batch.num_rows}
rows")
- # Record count is tracked internally by DataBlobWriter
+ # Record count is tracked internally by DedicatedFormatWriter
# Test prepare commit
commit_messages: CommitMessage = blob_writer.prepare_commit()
@@ -511,8 +511,8 @@ class DataBlobWriterTest(unittest.TestCase):
blob_writer.close()
- def test_data_blob_writer_abort_functionality(self):
- """Test DataBlobWriter abort functionality."""
+ def test_dedicated_format_writer_abort_functionality(self):
+ """Test DedicatedFormatWriter abort functionality."""
from pypaimon import Schema
# Create schema with blob column
@@ -546,12 +546,12 @@ class DataBlobWriterTest(unittest.TestCase):
blob_writer.write_arrow_batch(batch)
# Test abort - BatchTableWrite doesn't have abort method
- # The abort functionality is handled internally by DataBlobWriter
+ # The abort functionality is handled internally by
DedicatedFormatWriter
blob_writer.close()
- def test_data_blob_writer_multiple_batches(self):
- """Test DataBlobWriter with multiple batches and verify results."""
+ def test_dedicated_format_writer_multiple_batches(self):
+ """Test DedicatedFormatWriter with multiple batches and verify
results."""
from pypaimon import Schema
# Create schema with blob column
@@ -608,7 +608,7 @@ class DataBlobWriterTest(unittest.TestCase):
blob_writer.write_arrow_batch(batch)
total_rows += batch.num_rows
- # Record count is tracked internally by DataBlobWriter
+ # Record count is tracked internally by DedicatedFormatWriter
# Test prepare commit
commit_messages = blob_writer.prepare_commit()
@@ -619,8 +619,8 @@ class DataBlobWriterTest(unittest.TestCase):
blob_writer.close()
- def test_data_blob_writer_large_batches(self):
- """Test DataBlobWriter with large batches to test rolling behavior."""
+ def test_dedicated_format_writer_large_batches(self):
+ """Test DedicatedFormatWriter with large batches to test rolling
behavior."""
from pypaimon import Schema
# Create schema with blob column
@@ -671,7 +671,7 @@ class DataBlobWriterTest(unittest.TestCase):
blob_writer.write_arrow_batch(batch)
total_rows += batch.num_rows
- # Record count is tracked internally by DataBlobWriter
+ # Record count is tracked internally by DedicatedFormatWriter
# Test prepare commit
commit_messages = blob_writer.prepare_commit()
@@ -682,8 +682,8 @@ class DataBlobWriterTest(unittest.TestCase):
blob_writer.close()
- def test_data_blob_writer_mixed_data_types(self):
- """Test DataBlobWriter with mixed data types in blob column."""
+ def test_dedicated_format_writer_mixed_data_types(self):
+ """Test DedicatedFormatWriter with mixed data types in blob column."""
from pypaimon import Schema
# Create schema with blob column
@@ -726,7 +726,7 @@ class DataBlobWriterTest(unittest.TestCase):
blob_writer.write_arrow_batch(batch)
total_rows += batch.num_rows
- # Record count is tracked internally by DataBlobWriter
+ # Record count is tracked internally by DedicatedFormatWriter
# Test prepare commit
commit_messages = blob_writer.prepare_commit()
@@ -786,8 +786,8 @@ class DataBlobWriterTest(unittest.TestCase):
self.assertEqual(result_type, original_type, f"Row {i + 1}: Type
should match")
self.assertEqual(result_data, original_data, f"Row {i + 1}: Blob
data should match")
- def test_data_blob_writer_empty_batches(self):
- """Test DataBlobWriter with empty batches."""
+ def test_dedicated_format_writer_empty_batches(self):
+ """Test DedicatedFormatWriter with empty batches."""
from pypaimon import Schema
# Create schema with blob column
@@ -842,8 +842,8 @@ class DataBlobWriterTest(unittest.TestCase):
total_rows += batch.num_rows
# Verify record count (empty batch should not affect count)
- # Record count is tracked internally by DataBlobWriter
- # Record count is tracked internally by DataBlobWriter
+ # Record count is tracked internally by DedicatedFormatWriter
+ # Record count is tracked internally by DedicatedFormatWriter
# Test prepare commit
commit_messages = blob_writer.prepare_commit()
@@ -851,8 +851,8 @@ class DataBlobWriterTest(unittest.TestCase):
blob_writer.close()
- def test_data_blob_writer_rolling_behavior(self):
- """Test DataBlobWriter rolling behavior with multiple commits."""
+ def test_dedicated_format_writer_rolling_behavior(self):
+ """Test DedicatedFormatWriter rolling behavior with multiple
commits."""
from pypaimon import Schema
# Create schema with blob column
@@ -892,7 +892,7 @@ class DataBlobWriterTest(unittest.TestCase):
blob_writer.write_arrow_batch(batch)
# Verify total record count
- # Record count is tracked internally by DataBlobWriter
+ # Record count is tracked internally by DedicatedFormatWriter
# Test prepare commit
commit_messages = blob_writer.prepare_commit()
@@ -2714,8 +2714,8 @@ class DataBlobWriterTest(unittest.TestCase):
actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
self.assertEqual(actual, expected)
- def test_data_blob_writer_with_slice(self):
- """Test DataBlobWriter with mixed data types in blob column."""
+ def test_dedicated_format_writer_with_slice(self):
+ """Test DedicatedFormatWriter with mixed data types in blob column."""
# Create schema with blob column
pa_schema = pa.schema([
@@ -2776,8 +2776,8 @@ class DataBlobWriterTest(unittest.TestCase):
self.assertEqual(result.num_columns, 3, "Should have 3 columns")
self.assertEqual(result["id"].unique().to_pylist(), [2, 3], "Get
incorrect column ID")
- def test_data_blob_writer_with_shard(self):
- """Test DataBlobWriter with mixed data types in blob column."""
+ def test_dedicated_format_writer_with_shard(self):
+ """Test DedicatedFormatWriter with mixed data types in blob column."""
# Create schema with blob column
pa_schema = pa.schema([
diff --git a/paimon-python/pypaimon/tests/data_evolution_formats_test.py
b/paimon-python/pypaimon/tests/data_evolution_formats_test.py
new file mode 100644
index 0000000000..f89f3290f1
--- /dev/null
+++ b/paimon-python/pypaimon/tests/data_evolution_formats_test.py
@@ -0,0 +1,1063 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Data evolution tests covering parquet + blob + vector (vortex) formats.
+
+Each test writes data using different file format combinations and reads it
+back, verifying correctness of the data evolution merge path across formats.
+"""
+
+import os
+import shutil
+import sys
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+
+
+class DataEvolutionFormatsTest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', False)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ # ------------------------------------------------------------------
+ # Parquet-format data evolution
+ # ------------------------------------------------------------------
+
+ def test_parquet_column_subset_write_and_merge_read(self):
+ """Write disjoint column subsets as parquet, merge-read via data
evolution."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('score', pa.float64()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'file.format': 'parquet',
+ })
+ self.catalog.create_table('default.fmt_parquet_subset', schema, False)
+ table = self.catalog.get_table('default.fmt_parquet_subset')
+ wb = table.new_batch_write_builder()
+
+ # commit 1: write id + name
+ w0 = wb.new_write().with_write_type(['id', 'name'])
+ w1 = wb.new_write().with_write_type(['score'])
+ c = wb.new_commit()
+ w0.write_arrow(pa.Table.from_pydict(
+ {'id': [1, 2, 3], 'name': ['a', 'b', 'c']},
+ schema=pa.schema([('id', pa.int32()), ('name', pa.string())])))
+ w1.write_arrow(pa.Table.from_pydict(
+ {'score': [1.1, 2.2, 3.3]},
+ schema=pa.schema([('score', pa.float64())])))
+ cmts = w0.prepare_commit() + w1.prepare_commit()
+ for m in cmts:
+ for nf in m.new_files:
+ nf.first_row_id = 0
+ c.commit(cmts)
+ w0.close()
+ w1.close()
+ c.close()
+
+ # verify file format
+ all_files = [nf for m in cmts for nf in m.new_files]
+ for f in all_files:
+ self.assertTrue(f.file_name.endswith('.parquet'),
+ f"Expected parquet file, got {f.file_name}")
+
+ # read back
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ expect = pa.Table.from_pydict(
+ {'id': [1, 2, 3], 'name': ['a', 'b', 'c'], 'score': [1.1, 2.2,
3.3]},
+ schema=pa_schema)
+ self.assertEqual(actual, expect)
+
+ def test_parquet_overwrite_column(self):
+ """Write all columns, then overwrite one column via a second commit."""
+ pa_schema = pa.schema([
+ ('k', pa.int64()),
+ ('v', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'file.format': 'parquet',
+ })
+ self.catalog.create_table('default.fmt_parquet_overwrite', schema,
False)
+ table = self.catalog.get_table('default.fmt_parquet_overwrite')
+ wb = table.new_batch_write_builder()
+
+ # commit 1: full row
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(
+ {'k': [10, 20], 'v': ['old1', 'old2']}, schema=pa_schema))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ # commit 2: overwrite v only (first_row_id=0)
+ tw = wb.new_write().with_write_type(['v'])
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(
+ {'v': ['new1', 'new2']}, schema=pa.schema([('v', pa.string())])))
+ cmts = tw.prepare_commit()
+ for m in cmts:
+ for nf in m.new_files:
+ nf.first_row_id = 0
+ tc.commit(cmts)
+ tw.close()
+ tc.close()
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ expect = pa.Table.from_pydict(
+ {'k': [10, 20], 'v': ['new1', 'new2']}, schema=pa_schema)
+ self.assertEqual(actual, expect)
+
+ def test_parquet_append_new_rows(self):
+ """Append new rows (new first_row_id) with column subsets, merge-read
all."""
+ pa_schema = pa.schema([
+ ('a', pa.int32()),
+ ('b', pa.string()),
+ ('c', pa.float32()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'file.format': 'parquet',
+ })
+ self.catalog.create_table('default.fmt_parquet_append', schema, False)
+ table = self.catalog.get_table('default.fmt_parquet_append')
+ wb = table.new_batch_write_builder()
+
+ # commit 1: 2 full rows
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(
+ {'a': [1, 2], 'b': ['x', 'y'], 'c': [0.1, 0.2]}, schema=pa_schema))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ # commit 2: append 2 new rows with column subsets, first_row_id=2
+ w_ab = wb.new_write().with_write_type(['a', 'b'])
+ w_c = wb.new_write().with_write_type(['c'])
+ tc = wb.new_commit()
+ w_ab.write_arrow(pa.Table.from_pydict(
+ {'a': [3, 4], 'b': ['z', 'w']},
+ schema=pa.schema([('a', pa.int32()), ('b', pa.string())])))
+ w_c.write_arrow(pa.Table.from_pydict(
+ {'c': [0.3, 0.4]},
+ schema=pa.schema([('c', pa.float32())])))
+ cmts = w_ab.prepare_commit() + w_c.prepare_commit()
+ for m in cmts:
+ for nf in m.new_files:
+ nf.first_row_id = 2
+ tc.commit(cmts)
+ w_ab.close()
+ w_c.close()
+ tc.close()
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 4)
+ expect = pa.Table.from_pydict(
+ {'a': [1, 2, 3, 4], 'b': ['x', 'y', 'z', 'w'],
+ 'c': [0.1, 0.2, 0.3, 0.4]},
+ schema=pa_schema)
+ self.assertEqual(actual, expect)
+
+ # ------------------------------------------------------------------
+ # Blob-format data evolution
+ # ------------------------------------------------------------------
+
+ def test_blob_write_and_read(self):
+ """Write a table with normal + blob columns, read back and verify."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('payload', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('default.fmt_blob_basic', schema, False)
+ table = self.catalog.get_table('default.fmt_blob_basic')
+ wb = table.new_batch_write_builder()
+
+ blobs = [b'hello world', b'\x00\x01\x02\xff', b'paimon blob']
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(
+ {'id': [1, 2, 3], 'payload': blobs}, schema=pa_schema))
+ cmts = tw.prepare_commit()
+ tc.commit(cmts)
+ tw.close()
+ tc.close()
+
+ # verify we produced both parquet and blob files
+ all_files = [nf for m in cmts for nf in m.new_files]
+ parquet_files = [f for f in all_files if
f.file_name.endswith('.parquet')]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+ self.assertGreater(len(parquet_files), 0)
+ self.assertGreater(len(blob_files), 0)
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 3)
+ self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+ self.assertEqual(actual.column('payload').to_pylist(), blobs)
+
+ def test_blob_column_subset_evolution(self):
+ """Write normal+blob cols in one commit, overwrite normal col in
another, merge-read."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('doc', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('default.fmt_blob_evolution', schema, False)
+ table = self.catalog.get_table('default.fmt_blob_evolution')
+ wb = table.new_batch_write_builder()
+
+ # commit 1: write id + doc (normal + blob together)
+ tw = wb.new_write().with_write_type(['id', 'doc'])
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(
+ {'id': [1, 2], 'doc': [b'doc_alice', b'doc_bob']},
+ schema=pa.schema([('id', pa.int32()), ('doc',
pa.large_binary())])))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ # commit 2: write name for the same rows (first_row_id=0)
+ tw = wb.new_write().with_write_type(['name'])
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(
+ {'name': ['Alice', 'Bob']},
+ schema=pa.schema([('name', pa.string())])))
+ cmts = tw.prepare_commit()
+ for m in cmts:
+ for nf in m.new_files:
+ nf.first_row_id = 0
+ tc.commit(cmts)
+ tw.close()
+ tc.close()
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 2)
+ self.assertEqual(actual.column('id').to_pylist(), [1, 2])
+ self.assertEqual(actual.column('name').to_pylist(), ['Alice', 'Bob'])
+ self.assertEqual(actual.column('doc').to_pylist(), [b'doc_alice',
b'doc_bob'])
+
+ def test_blob_append_with_subset_evolution(self):
+ """Write normal+blob subset in first commit, add remaining col via
evolution."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('tag', pa.string()),
+ ('picture', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('default.fmt_blob_append_evo', schema, False)
+ table = self.catalog.get_table('default.fmt_blob_append_evo')
+ wb = table.new_batch_write_builder()
+
+ # commit 1: id + picture (normal + blob)
+ tw = wb.new_write().with_write_type(['id', 'picture'])
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(
+ {'id': [1, 2], 'picture': [b'pic1', b'pic2']},
+ schema=pa.schema([('id', pa.int32()), ('picture',
pa.large_binary())])))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ # commit 2: add tag for the same rows
+ tw = wb.new_write().with_write_type(['tag'])
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(
+ {'tag': ['t1', 't2']},
+ schema=pa.schema([('tag', pa.string())])))
+ cmts = tw.prepare_commit()
+ for m in cmts:
+ for nf in m.new_files:
+ nf.first_row_id = 0
+ tc.commit(cmts)
+ tw.close()
+ tc.close()
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 2)
+ self.assertEqual(actual.column('id').to_pylist(), [1, 2])
+ self.assertEqual(actual.column('tag').to_pylist(), ['t1', 't2'])
+ self.assertEqual(actual.column('picture').to_pylist(), [b'pic1',
b'pic2'])
+
+ def test_blob_multiple_blob_columns(self):
+ """Table with two blob columns, write and read both."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('audio', pa.large_binary()),
+ ('video', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('default.fmt_blob_multi', schema, False)
+ table = self.catalog.get_table('default.fmt_blob_multi')
+ wb = table.new_batch_write_builder()
+
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'audio': [b'audio_1', b'audio_2'],
+ 'video': [b'video_1', b'video_2'],
+ }, schema=pa_schema))
+ cmts = tw.prepare_commit()
+ tc.commit(cmts)
+ tw.close()
+ tc.close()
+
+ # verify blob files were produced
+ all_files = [nf for m in cmts for nf in m.new_files]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+ self.assertGreaterEqual(len(blob_files), 2)
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 2)
+ self.assertEqual(actual.column('audio').to_pylist(), [b'audio_1',
b'audio_2'])
+ self.assertEqual(actual.column('video').to_pylist(), [b'video_1',
b'video_2'])
+
+ # ------------------------------------------------------------------
+ # Vortex-format data evolution
+ # ------------------------------------------------------------------
+
+ @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python
>= 3.11")
+ @unittest.skipUnless(
+ __import__('importlib').util.find_spec('vortex') is not None,
+ "vortex not installed")
+ def test_vortex_column_subset_write_and_merge_read(self):
+ """Write disjoint column subsets as vortex, merge-read via data
evolution."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('tag', pa.string()),
+ ('val', pa.float64()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'file.format': 'vortex',
+ })
+ self.catalog.create_table('default.fmt_vortex_subset', schema, False)
+ table = self.catalog.get_table('default.fmt_vortex_subset')
+ wb = table.new_batch_write_builder()
+
+ w0 = wb.new_write().with_write_type(['id', 'tag'])
+ w1 = wb.new_write().with_write_type(['val'])
+ c = wb.new_commit()
+ w0.write_arrow(pa.Table.from_pydict(
+ {'id': [10, 20, 30], 'tag': ['p', 'q', 'r']},
+ schema=pa.schema([('id', pa.int32()), ('tag', pa.string())])))
+ w1.write_arrow(pa.Table.from_pydict(
+ {'val': [1.5, 2.5, 3.5]},
+ schema=pa.schema([('val', pa.float64())])))
+ cmts = w0.prepare_commit() + w1.prepare_commit()
+ for m in cmts:
+ for nf in m.new_files:
+ nf.first_row_id = 0
+ c.commit(cmts)
+ w0.close()
+ w1.close()
+ c.close()
+
+ # verify vortex files
+ all_files = [nf for m in cmts for nf in m.new_files]
+ for f in all_files:
+ self.assertTrue(f.file_name.endswith('.vortex'),
+ f"Expected vortex file, got {f.file_name}")
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ expect = pa.Table.from_pydict(
+ {'id': [10, 20, 30], 'tag': ['p', 'q', 'r'], 'val': [1.5, 2.5,
3.5]},
+ schema=pa_schema)
+ self.assertEqual(actual, expect)
+
+ @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python
>= 3.11")
+ @unittest.skipUnless(
+ __import__('importlib').util.find_spec('vortex') is not None,
+ "vortex not installed")
+ def test_vortex_overwrite_column(self):
+ """Full row write then overwrite one column, all in vortex format."""
+ pa_schema = pa.schema([
+ ('k', pa.int64()),
+ ('v', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'file.format': 'vortex',
+ })
+ self.catalog.create_table('default.fmt_vortex_overwrite', schema,
False)
+ table = self.catalog.get_table('default.fmt_vortex_overwrite')
+ wb = table.new_batch_write_builder()
+
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(
+ {'k': [100, 200], 'v': ['old', 'old']}, schema=pa_schema))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ tw = wb.new_write().with_write_type(['v'])
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(
+ {'v': ['new', 'new']}, schema=pa.schema([('v', pa.string())])))
+ cmts = tw.prepare_commit()
+ for m in cmts:
+ for nf in m.new_files:
+ nf.first_row_id = 0
+ tc.commit(cmts)
+ tw.close()
+ tc.close()
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual, pa.Table.from_pydict(
+ {'k': [100, 200], 'v': ['new', 'new']}, schema=pa_schema))
+
+ @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python
>= 3.11")
+ @unittest.skipUnless(
+ __import__('importlib').util.find_spec('vortex') is not None,
+ "vortex not installed")
+ def test_vortex_append_new_rows(self):
+ """Append new rows with column subsets in vortex format."""
+ pa_schema = pa.schema([
+ ('x', pa.int32()),
+ ('y', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'file.format': 'vortex',
+ })
+ self.catalog.create_table('default.fmt_vortex_append', schema, False)
+ table = self.catalog.get_table('default.fmt_vortex_append')
+ wb = table.new_batch_write_builder()
+
+ # commit 1
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(
+ {'x': [1, 2], 'y': ['a', 'b']}, schema=pa_schema))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ # commit 2: append with subsets, first_row_id=2
+ w_x = wb.new_write().with_write_type(['x'])
+ w_y = wb.new_write().with_write_type(['y'])
+ tc = wb.new_commit()
+ w_x.write_arrow(pa.Table.from_pydict(
+ {'x': [3]}, schema=pa.schema([('x', pa.int32())])))
+ w_y.write_arrow(pa.Table.from_pydict(
+ {'y': ['c']}, schema=pa.schema([('y', pa.string())])))
+ cmts = w_x.prepare_commit() + w_y.prepare_commit()
+ for m in cmts:
+ for nf in m.new_files:
+ nf.first_row_id = 2
+ tc.commit(cmts)
+ w_x.close()
+ w_y.close()
+ tc.close()
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 3)
+ expect = pa.Table.from_pydict(
+ {'x': [1, 2, 3], 'y': ['a', 'b', 'c']}, schema=pa_schema)
+ self.assertEqual(actual, expect)
+
+ @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python
>= 3.11")
+ @unittest.skipUnless(
+ __import__('importlib').util.find_spec('vortex') is not None,
+ "vortex not installed")
+ def test_vortex_with_row_id_and_filter(self):
+ """Write vortex data, read with _ROW_ID projection and filter."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('val', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'file.format': 'vortex',
+ })
+ self.catalog.create_table('default.fmt_vortex_rowid_filter', schema,
False)
+ table = self.catalog.get_table('default.fmt_vortex_rowid_filter')
+ wb = table.new_batch_write_builder()
+
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(
+ {'id': list(range(10)), 'val': [f'v{i}' for i in range(10)]},
+ schema=pa_schema))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ # full read
+ rb = table.new_read_builder()
+ full = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(full.num_rows, 10)
+
+ # filter by _ROW_ID
+ rb_rid = table.new_read_builder().with_projection(['id', 'val',
'_ROW_ID'])
+ pb = rb_rid.new_predicate_builder()
+ rb_f = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 5))
+ actual = rb_f.new_read().to_arrow(rb_f.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 1)
+ self.assertEqual(actual.column('id')[0].as_py(), 5)
+ self.assertEqual(actual.column('val')[0].as_py(), 'v5')
+
+ # ------------------------------------------------------------------
+ # Vector (vortex) file format for embedding columns
+ # ------------------------------------------------------------------
+
+ @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python
>= 3.11")
+ @unittest.skipUnless(
+ __import__('importlib').util.find_spec('vortex') is not None,
+ "vortex not installed")
+ def test_vector_vortex_write_and_read(self):
+ """Write table with normal + vector columns using vortex vector
format."""
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('embed', pa.list_(pa.float32(), 4)),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'file.format': 'vortex',
+ 'vector.file.format': 'vortex',
+ })
+ self.catalog.create_table('default.fmt_vec_vortex', schema, False)
+ table = self.catalog.get_table('default.fmt_vec_vortex')
+
+ embeddings = [1.0, 0.0, 0.0, 0.0,
+ 0.0, 1.0, 0.0, 0.0,
+ 0.0, 0.0, 1.0, 0.0]
+ test_data = pa.table({
+ 'id': pa.array([1, 2, 3], type=pa.int64()),
+ 'embed': pa.FixedSizeListArray.from_arrays(
+ pa.array(embeddings, type=pa.float32()), 4),
+ })
+
+ wb = table.new_batch_write_builder()
+ tw = wb.new_write()
+ tw.write_arrow(test_data)
+ cmts = tw.prepare_commit()
+
+ # should produce both normal and vector files
+ all_files = [nf for m in cmts for nf in m.new_files]
+ normal_files = [f for f in all_files if not
DataFileMeta.is_vector_file(f.file_name)]
+ vector_files = [f for f in all_files if
DataFileMeta.is_vector_file(f.file_name)]
+ self.assertGreater(len(normal_files), 0)
+ self.assertGreater(len(vector_files), 0)
+ for vf in vector_files:
+ self.assertIn('.vector.vortex', vf.file_name)
+
+ wb.new_commit().commit(cmts)
+ tw.close()
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 3)
+ self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+ embed_col = actual.column('embed')
+ self.assertEqual(embed_col[0].as_py(), [1.0, 0.0, 0.0, 0.0])
+ self.assertEqual(embed_col[1].as_py(), [0.0, 1.0, 0.0, 0.0])
+ self.assertEqual(embed_col[2].as_py(), [0.0, 0.0, 1.0, 0.0])
+
+ @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python
>= 3.11")
+ @unittest.skipUnless(
+ __import__('importlib').util.find_spec('vortex') is not None,
+ "vortex not installed")
+ def test_vector_vortex_multiple_appends(self):
+ """Append multiple batches of normal+vector data and read all back."""
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('label', pa.string()),
+ ('embed', pa.list_(pa.float32(), 3)),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'file.format': 'vortex',
+ 'vector.file.format': 'vortex',
+ })
+ self.catalog.create_table('default.fmt_vec_vortex_append', schema,
False)
+ table = self.catalog.get_table('default.fmt_vec_vortex_append')
+ wb = table.new_batch_write_builder()
+
+ # commit 1
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.table({
+ 'id': pa.array([1, 2], type=pa.int64()),
+ 'label': pa.array(['cat', 'dog']),
+ 'embed': pa.FixedSizeListArray.from_arrays(
+ pa.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6], type=pa.float32()),
3),
+ }))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ # commit 2: append
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.table({
+ 'id': pa.array([3], type=pa.int64()),
+ 'label': pa.array(['bird']),
+ 'embed': pa.FixedSizeListArray.from_arrays(
+ pa.array([0.7, 0.8, 0.9], type=pa.float32()), 3),
+ }))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 3)
+ self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+ self.assertEqual(actual.column('label').to_pylist(), ['cat', 'dog',
'bird'])
+ embed_col = actual.column('embed')
+ self.assertAlmostEqual(embed_col[0].as_py()[0], 0.1, places=5)
+ self.assertAlmostEqual(embed_col[2].as_py()[2], 0.9, places=5)
+
+ # ------------------------------------------------------------------
+ # Mixed formats: parquet + blob + vector in one table
+ # ------------------------------------------------------------------
+
+ def test_parquet_and_blob_mixed_append(self):
+ """Table with normal parquet cols + blob col, append new rows."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('image', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('default.fmt_mixed_parquet_blob', schema,
False)
+ table = self.catalog.get_table('default.fmt_mixed_parquet_blob')
+ wb = table.new_batch_write_builder()
+
+ # commit 1: first batch
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict({
+ 'id': [1, 2],
+ 'name': ['a', 'b'],
+ 'image': [b'img1', b'img2'],
+ }, schema=pa_schema))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ # commit 2: append more rows
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict({
+ 'id': [3, 4],
+ 'name': ['c', 'd'],
+ 'image': [b'img3', b'img4'],
+ }, schema=pa_schema))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 4)
+ self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3, 4])
+ self.assertEqual(actual.column('name').to_pylist(), ['a', 'b', 'c',
'd'])
+ self.assertEqual(actual.column('image').to_pylist(),
+ [b'img1', b'img2', b'img3', b'img4'])
+
+ @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python
>= 3.11")
+ @unittest.skipUnless(
+ __import__('importlib').util.find_spec('vortex') is not None,
+ "vortex not installed")
+ def test_vortex_and_vector_vortex_mixed(self):
+ """Table with normal (vortex) + vector (vortex) columns, write and
read.
+
+ Verifies that the writer produces separate .vortex and .vector.vortex
files,
+ and the data evolution merge reader stitches them back together.
+ """
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('name', pa.string()),
+ ('embed', pa.list_(pa.float32(), 3)),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'file.format': 'vortex',
+ 'vector.file.format': 'vortex',
+ })
+ self.catalog.create_table('default.fmt_vortex_vector', schema, False)
+ table = self.catalog.get_table('default.fmt_vortex_vector')
+ wb = table.new_batch_write_builder()
+
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.table({
+ 'id': pa.array([1, 2, 3], type=pa.int64()),
+ 'name': pa.array(['cat', 'dog', 'bird']),
+ 'embed': pa.FixedSizeListArray.from_arrays(
+ pa.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
+ type=pa.float32()), 3),
+ }))
+ cmts = tw.prepare_commit()
+ tc.commit(cmts)
+ tw.close()
+ tc.close()
+
+ # verify two file types: .vortex + .vector.vortex
+ all_files = [nf for m in cmts for nf in m.new_files]
+ normal_files = [f for f in all_files if not
DataFileMeta.is_vector_file(f.file_name)]
+ vector_files = [f for f in all_files if
DataFileMeta.is_vector_file(f.file_name)]
+ self.assertGreater(len(normal_files), 0, "should produce normal vortex
files")
+ self.assertGreater(len(vector_files), 0, "should produce vector files")
+ for nf in normal_files:
+ self.assertTrue(nf.file_name.endswith('.vortex'))
+ for vf in vector_files:
+ self.assertIn('.vector.vortex', vf.file_name)
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 3)
+ self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+ self.assertEqual(actual.column('name').to_pylist(), ['cat', 'dog',
'bird'])
+ embed = actual.column('embed')
+ self.assertAlmostEqual(embed[0].as_py()[0], 0.1, places=5)
+ self.assertAlmostEqual(embed[2].as_py()[2], 0.9, places=5)
+
+ def test_blob_and_vector_inline_mixed(self):
+ """Table with normal + blob + vector(inline) columns, write and read.
+
+ When blob columns are present, vector columns are stored inline in the
+ parquet file (not as separate .vector files). This test verifies the
+ blob+inline-vector path works correctly.
+ """
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('doc', pa.large_binary()),
+ ('embed', pa.list_(pa.float32(), 3)),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('default.fmt_blob_vector_inline', schema,
False)
+ table = self.catalog.get_table('default.fmt_blob_vector_inline')
+ wb = table.new_batch_write_builder()
+
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.table({
+ 'id': pa.array([1, 2], type=pa.int64()),
+ 'doc': pa.array([b'doc1', b'doc2'], type=pa.large_binary()),
+ 'embed': pa.FixedSizeListArray.from_arrays(
+ pa.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6], type=pa.float32()),
3),
+ }))
+ cmts = tw.prepare_commit()
+ tc.commit(cmts)
+ tw.close()
+ tc.close()
+
+ # verify parquet + blob files
+ all_files = [nf for m in cmts for nf in m.new_files]
+ parquet_files = [f for f in all_files if
f.file_name.endswith('.parquet')]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+ self.assertGreater(len(parquet_files), 0, "should produce parquet
files")
+ self.assertGreater(len(blob_files), 0, "should produce blob files")
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 2)
+ self.assertEqual(actual.column('id').to_pylist(), [1, 2])
+ self.assertEqual(actual.column('doc').to_pylist(), [b'doc1', b'doc2'])
+ embed = actual.column('embed')
+ self.assertAlmostEqual(embed[0].as_py()[0], 0.1, places=5)
+ self.assertAlmostEqual(embed[1].as_py()[2], 0.6, places=5)
+
+ def test_blob_and_vector_with_vector_file_format(self):
+ """Table with blob + vector columns and explicit vector.file.format.
+
+ DedicatedFormatWriter splits data three ways: normal columns to
.parquet,
+ blob columns to .blob, and vector columns to .vector.<format> files.
+ """
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('doc', pa.large_binary()),
+ ('embed', pa.list_(pa.float32(), 3)),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'vector.file.format': 'parquet',
+ })
+ self.catalog.create_table('default.fmt_blob_vec_format', schema, False)
+ table = self.catalog.get_table('default.fmt_blob_vec_format')
+ wb = table.new_batch_write_builder()
+
+ # commit 1: write all columns
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.table({
+ 'id': pa.array([1, 2, 3], type=pa.int64()),
+ 'doc': pa.array([b'aaa', b'bbb', b'ccc'], type=pa.large_binary()),
+ 'embed': pa.FixedSizeListArray.from_arrays(
+ pa.array([1.0, 0.0, 0.0,
+ 0.0, 1.0, 0.0,
+ 0.0, 0.0, 1.0], type=pa.float32()), 3),
+ }))
+ cmts = tw.prepare_commit()
+ tc.commit(cmts)
+ tw.close()
+ tc.close()
+
+ # DedicatedFormatWriter produces parquet + blob + vector files
+ all_files = [nf for m in cmts for nf in m.new_files]
+ parquet_files = [f for f in all_files
+ if f.file_name.endswith('.parquet')
+ and not DataFileMeta.is_vector_file(f.file_name)]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+ vector_files = [f for f in all_files if
DataFileMeta.is_vector_file(f.file_name)]
+ self.assertGreater(len(parquet_files), 0, "should produce normal
parquet files")
+ self.assertGreater(len(blob_files), 0, "should produce blob files")
+ self.assertGreater(len(vector_files), 0, "should produce vector files")
+ for vf in vector_files:
+ self.assertIn('.vector.parquet', vf.file_name)
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 3)
+ self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+ self.assertEqual(actual.column('doc').to_pylist(), [b'aaa', b'bbb',
b'ccc'])
+ self.assertEqual(actual.column('embed')[0].as_py(), [1.0, 0.0, 0.0])
+ self.assertEqual(actual.column('embed')[1].as_py(), [0.0, 1.0, 0.0])
+ self.assertEqual(actual.column('embed')[2].as_py(), [0.0, 0.0, 1.0])
+
+ # commit 2: append more rows
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.table({
+ 'id': pa.array([4, 5], type=pa.int64()),
+ 'doc': pa.array([b'ddd', b'eee'], type=pa.large_binary()),
+ 'embed': pa.FixedSizeListArray.from_arrays(
+ pa.array([0.5, 0.5, 0.0,
+ 0.0, 0.5, 0.5], type=pa.float32()), 3),
+ }))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ rb = table.new_read_builder()
+ actual2 = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual2.num_rows, 5)
+ self.assertEqual(actual2.column('id').to_pylist(), [1, 2, 3, 4, 5])
+ self.assertEqual(actual2.column('doc').to_pylist(),
+ [b'aaa', b'bbb', b'ccc', b'ddd', b'eee'])
+
+ def test_blob_vector_partial_write_vector_only(self):
+ """Blob+vector table with with_write_type(['embed']) — vector-only
partial write.
+
+ When normal_column_names is empty, the writer must still flush vector
+ metadata without crashing on an empty normal data path.
+ """
+ pa_schema = pa.schema([
+ ('id', pa.int64()),
+ ('doc', pa.large_binary()),
+ ('embed', pa.list_(pa.float32(), 3)),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'vector.file.format': 'parquet',
+ })
+ self.catalog.create_table('default.fmt_blob_vec_partial', schema,
False)
+ table = self.catalog.get_table('default.fmt_blob_vec_partial')
+ wb = table.new_batch_write_builder()
+
+ # commit 1: write all columns
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.table({
+ 'id': pa.array([1, 2, 3], type=pa.int64()),
+ 'doc': pa.array([b'aaa', b'bbb', b'ccc'], type=pa.large_binary()),
+ 'embed': pa.FixedSizeListArray.from_arrays(
+ pa.array([1.0, 0.0, 0.0,
+ 0.0, 1.0, 0.0,
+ 0.0, 0.0, 1.0], type=pa.float32()), 3),
+ }))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ # commit 2: write only vector column — no normal columns
+ tw = wb.new_write().with_write_type(['embed'])
+ tc = wb.new_commit()
+ tw.write_arrow(pa.table({
+ 'embed': pa.FixedSizeListArray.from_arrays(
+ pa.array([0.5, 0.5, 0.0,
+ 0.0, 0.5, 0.5,
+ 0.5, 0.0, 0.5], type=pa.float32()), 3),
+ }))
+ cmts = tw.prepare_commit()
+
+ # should produce only vector files, no normal or blob files
+ all_files = [nf for m in cmts for nf in m.new_files]
+ self.assertGreater(len(all_files), 0, "should produce vector files")
+ for f in all_files:
+ self.assertTrue(DataFileMeta.is_vector_file(f.file_name),
+ f"Expected vector file, got {f.file_name}")
+
+ for m in cmts:
+ for nf in m.new_files:
+ nf.first_row_id = 0
+ tc.commit(cmts)
+ tw.close()
+ tc.close()
+
+ # read back and verify the vector column was updated
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 3)
+ self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+ self.assertEqual(actual.column('doc').to_pylist(), [b'aaa', b'bbb',
b'ccc'])
+ embed = actual.column('embed')
+ self.assertEqual(embed[0].as_py(), [0.5, 0.5, 0.0])
+ self.assertEqual(embed[1].as_py(), [0.0, 0.5, 0.5])
+ self.assertEqual(embed[2].as_py(), [0.5, 0.0, 0.5])
+
+ # ------------------------------------------------------------------
+ # Projection and _ROW_ID across formats
+ # ------------------------------------------------------------------
+
+ def test_blob_with_row_id_projection(self):
+ """Read blob table with _ROW_ID projection."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('data', pa.large_binary()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ })
+ self.catalog.create_table('default.fmt_blob_rowid', schema, False)
+ table = self.catalog.get_table('default.fmt_blob_rowid')
+ wb = table.new_batch_write_builder()
+
+ tw = wb.new_write()
+ tc = wb.new_commit()
+ tw.write_arrow(pa.Table.from_pydict(
+ {'id': [10, 20], 'data': [b'aa', b'bb']}, schema=pa_schema))
+ tc.commit(tw.prepare_commit())
+ tw.close()
+ tc.close()
+
+ rb = table.new_read_builder()
+ rb.with_projection(['id', 'data', '_ROW_ID'])
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, 2)
+ self.assertEqual(actual.column('_ROW_ID').to_pylist(), [0, 1])
+ self.assertEqual(actual.column('id').to_pylist(), [10, 20])
+ self.assertEqual(actual.column('data').to_pylist(), [b'aa', b'bb'])
+
+ def test_parquet_large_data_evolution(self):
+ """Larger dataset: 1000 rows, column-subset write+merge."""
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('col_a', pa.string()),
+ ('col_b', pa.float64()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'file.format': 'parquet',
+ })
+ self.catalog.create_table('default.fmt_parquet_large', schema, False)
+ table = self.catalog.get_table('default.fmt_parquet_large')
+ wb = table.new_batch_write_builder()
+
+ n = 1000
+ w0 = wb.new_write().with_write_type(['id', 'col_a'])
+ w1 = wb.new_write().with_write_type(['col_b'])
+ c = wb.new_commit()
+ w0.write_arrow(pa.Table.from_pydict(
+ {'id': list(range(n)), 'col_a': [f's{i}' for i in range(n)]},
+ schema=pa.schema([('id', pa.int32()), ('col_a', pa.string())])))
+ w1.write_arrow(pa.Table.from_pydict(
+ {'col_b': [float(i) for i in range(n)]},
+ schema=pa.schema([('col_b', pa.float64())])))
+ cmts = w0.prepare_commit() + w1.prepare_commit()
+ for m in cmts:
+ for nf in m.new_files:
+ nf.first_row_id = 0
+ c.commit(cmts)
+ w0.close()
+ w1.close()
+ c.close()
+
+ rb = table.new_read_builder()
+ actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+ self.assertEqual(actual.num_rows, n)
+ self.assertEqual(actual.column('id').to_pylist(), list(range(n)))
+ self.assertEqual(actual.column('col_a').to_pylist(), [f's{i}' for i in
range(n)])
+ self.assertEqual(actual.column('col_b').to_pylist(), [float(i) for i
in range(n)])
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/paimon-python/pypaimon/write/file_store_write.py
b/paimon-python/pypaimon/write/file_store_write.py
index c31a9f8a91..6a20708a14 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -23,7 +23,7 @@ import pyarrow as pa
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
-from pypaimon.write.writer.data_blob_writer import DataBlobWriter
+from pypaimon.write.writer.dedicated_format_writer import DedicatedFormatWriter
from pypaimon.write.writer.data_vector_writer import DataVectorWriter
from pypaimon.write.writer.data_writer import DataWriter
from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter
@@ -65,7 +65,7 @@ class FileStoreWrite:
# Check if table has blob columns
if self._has_blob_columns():
- return DataBlobWriter(
+ return DedicatedFormatWriter(
table=self.table,
partition=partition,
bucket=bucket,
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py
b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
similarity index 73%
rename from paimon-python/pypaimon/write/writer/data_blob_writer.py
rename to paimon-python/pypaimon/write/writer/dedicated_format_writer.py
index cb131f9a54..6e7ea57781 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
@@ -25,53 +25,25 @@ from pypaimon.common.options.core_options import CoreOptions
from pypaimon.data.timestamp import Timestamp
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.schema.data_types import VectorType
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.write.writer.data_writer import DataWriter
logger = logging.getLogger(__name__)
-class DataBlobWriter(DataWriter):
- """
- A rolling file writer that handles both normal data and blob data. This
writer creates separate
- files for normal columns and blob columns, managing their lifecycle
independently.
-
- For example, given a table schema with normal columns (id INT, name
STRING) and blob columns
- (pic1 BLOB, pic2 BLOB), this writer will create separate files for normal
columns and each
- blob-file column.
-
- Key features:
- - Blob data can roll independently when normal data doesn't need rolling
- - When normal data rolls, blob data MUST also be closed (Java behavior)
- - Blob data uses more aggressive rolling (smaller target size) to prevent
memory issues
- - One normal data file may correspond to multiple blob data files
- - Blob data is written immediately to disk to prevent memory corruption
- - Blob file metadata is stored as separate DataFileMeta objects after
normal file metadata
- - When TableWrite.with_write_type narrows columns, incoming batches only
carry that subset;
- column lists are narrowed accordingly so splitting never selects missing
columns.
-
- Rolling behavior:
- - Normal data rolls: Both normal and blob writers are closed together,
blob metadata added after normal metadata
- - Blob data rolls independently: Only blob writer is closed, blob metadata
is cached until normal data rolls
-
- Metadata organization:
- - Normal file metadata is added first to committed_files
- - Blob file metadata is added after normal file metadata in committed_files
- - When blob rolls independently, metadata is cached until normal data rolls
- - Result: [normal_meta, blob_meta1, blob_meta2, blob_meta3, ...]
-
- Example file organization:
- committed_files = [
- normal_file1_meta, # f1.parquet metadata
- blob_file1_meta, # b1.blob metadata
- blob_file2_meta, # b2.blob metadata
- blob_file3_meta, # b3.blob metadata
- normal_file2_meta, # f1-2.parquet metadata
- blob_file4_meta, # b4.blob metadata
- blob_file5_meta, # b5.blob metadata
- ]
-
- This matches the Java RollingBlobFileWriter behavior exactly.
+class DedicatedFormatWriter(DataWriter):
+ """A rolling file writer that writes normal, blob, and vector columns to
dedicated files.
+
+ Splits incoming data three ways:
+ - Normal columns → standard data files (.parquet / .orc / .vortex / …)
+ - Blob columns (large_binary) → .blob files
+ - Vector columns (when vector.file.format is configured) →
.vector.<format> files
+
+ This mirrors Java's DedicatedFormatRollingFileWriter.
+
+ Metadata order in committed_files:
+ [normal_meta, blob_meta1, …, vector_meta1, …]
"""
# Constant for checking rolling condition periodically
@@ -101,6 +73,13 @@ class DataBlobWriter(DataWriter):
full_blob_file_set = set(full_blob_file_column_names)
all_column_names = self.table.field_names
+ # Detect vector columns that should be written to dedicated files.
+ full_vector_column_names = self._get_vector_columns_from_schema()
+ full_vector_set = set(full_vector_column_names)
+ # Only split vector columns when vector.file.format is configured.
+ has_dedicated_vector = bool(full_vector_column_names) and
options.with_vector_format()
+ dedicated_set = full_blob_file_set | (full_vector_set if
has_dedicated_vector else set())
+
# Narrow columns when TableWrite.with_write_type(...) supplies a
partial column list.
# Incoming RecordBatches only contain those columns; selecting full
normal/blob lists
# would raise KeyError.
@@ -109,13 +88,17 @@ class DataBlobWriter(DataWriter):
self.blob_file_column_names = [
col for col in full_blob_file_column_names if col in
write_col_set
]
+ self.vector_write_columns = [
+ col for col in full_vector_column_names if col in write_col_set
+ ] if has_dedicated_vector else []
self.normal_column_names = [
- col for col in write_cols if col not in full_blob_file_set
+ col for col in write_cols if col not in dedicated_set
]
else:
self.blob_file_column_names = list(full_blob_file_column_names)
+ self.vector_write_columns = list(full_vector_column_names) if
has_dedicated_vector else []
self.normal_column_names = [
- col for col in all_column_names if col not in
full_blob_file_set
+ col for col in all_column_names if col not in dedicated_set
]
normal_name_set = set(self.normal_column_names)
self.normal_columns = [
@@ -143,6 +126,20 @@ class DataBlobWriter(DataWriter):
options=options
)
+ # Initialize vector writer when vector.file.format is configured.
+ from pypaimon.write.writer.vector_writer import VectorWriter
+ self.vector_writer: Optional[VectorWriter] = None
+ if self.vector_write_columns:
+ self.vector_writer = VectorWriter(
+ table=self.table,
+ partition=self.partition,
+ bucket=self.bucket,
+ max_seq_number=max_seq_number,
+ vector_columns=self.vector_write_columns,
+ vector_file_format=options.vector_file_format(),
+ options=options,
+ )
+
# Initialize ExternalStorageBlobWriter if configured
self._external_storage_writer = None
external_storage_fields = self.options.blob_external_storage_fields()
@@ -159,10 +156,11 @@ class DataBlobWriter(DataWriter):
)
logger.info(
- "Initialized DataBlobWriter with blob columns: %s, blob file
columns: %s, descriptor "
- "stored columns: %s, external storage fields: %s",
+ "Initialized DedicatedFormatWriter with blob columns: %s, blob
file columns: %s, "
+ "vector columns: %s, descriptor stored columns: %s, external
storage fields: %s",
self.blob_column_names,
self.blob_file_column_names,
+ self.vector_write_columns,
sorted(self.blob_descriptor_fields),
sorted(external_storage_fields) if external_storage_fields else [],
)
@@ -178,8 +176,14 @@ class DataBlobWriter(DataWriter):
raise ValueError("No blob field found in table schema.")
return blob_columns
+ def _get_vector_columns_from_schema(self) -> List[str]:
+ return [
+ field.name for field in self.table.table_schema.fields
+ if isinstance(field.type, VectorType)
+ ]
+
def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
- normal_data, _ = self._split_data(data)
+ normal_data, _, _ = self._split_data(data)
return normal_data
def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) ->
pa.Table:
@@ -192,22 +196,27 @@ class DataBlobWriter(DataWriter):
if self._external_storage_writer:
data = self._external_storage_writer.transform_batch(data)
- # Split data into normal and blob parts
- normal_data, blob_data_map = self._split_data(data)
+ # Split data into normal, blob, and vector parts
+ normal_data, blob_data_map, vector_data = self._split_data(data)
self._validate_descriptor_stored_fields_input(data)
- # Process and accumulate normal data
+ # Process and accumulate normal data (may be None for partial
writes)
processed_normal = self._process_normal_data(normal_data)
- if self.pending_normal_data is None:
- self.pending_normal_data = processed_normal
- else:
- self.pending_normal_data =
self._merge_normal_data(self.pending_normal_data, processed_normal)
+ if processed_normal is not None:
+ if self.pending_normal_data is None:
+ self.pending_normal_data = processed_normal
+ else:
+ self.pending_normal_data =
self._merge_normal_data(self.pending_normal_data, processed_normal)
# Write blob-file columns to dedicated blob writers.
for blob_column, blob_data in blob_data_map.items():
if blob_data is not None and blob_data.num_rows > 0:
self.blob_writers[blob_column].write(blob_data)
+ # Write vector columns to dedicated vector writer.
+ if self.vector_writer is not None and vector_data is not None and
vector_data.num_rows > 0:
+ self.vector_writer.write(vector_data)
+
self.record_count += data.num_rows
# Check if normal data rolling is needed
@@ -231,8 +240,7 @@ class DataBlobWriter(DataWriter):
return
try:
- if self.pending_normal_data is not None and
self.pending_normal_data.num_rows > 0:
- self._close_current_writers()
+ self._close_current_writers()
if self._external_storage_writer:
self._external_storage_writer.close()
except Exception as e:
@@ -246,18 +254,27 @@ class DataBlobWriter(DataWriter):
"""Abort all writers and clean up resources."""
for blob_writer in self.blob_writers.values():
blob_writer.abort()
+ if self.vector_writer is not None:
+ self.vector_writer.abort()
if self._external_storage_writer:
self._external_storage_writer.abort()
self.pending_normal_data = None
self.committed_files.clear()
- def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch,
Dict[str, pa.RecordBatch]]:
- """Split data into normal and blob parts based on column names."""
+ def _split_data(self, data: pa.RecordBatch) -> Tuple[
+ pa.RecordBatch, Dict[str, pa.RecordBatch],
Optional[pa.RecordBatch]]:
+ """Split data into normal, blob, and vector parts based on column
names."""
normal_data = data.select(self.normal_column_names) if
self.normal_column_names else None
blob_data_map = {
blob_column: data.select([blob_column]) for blob_column in
self.blob_file_column_names
}
- return normal_data, blob_data_map
+ vector_data = (
+ pa.RecordBatch.from_arrays(
+ [data.column(name) for name in self.vector_write_columns],
+ names=self.vector_write_columns,
+ ) if self.vector_write_columns else None
+ )
+ return normal_data, blob_data_map, vector_data
def _validate_descriptor_stored_fields_input(self, data: pa.RecordBatch):
if not self.blob_descriptor_fields:
@@ -293,10 +310,10 @@ class DataBlobWriter(DataWriter):
) from e
@staticmethod
- def _process_normal_data(data: pa.RecordBatch) -> pa.Table:
+ def _process_normal_data(data: pa.RecordBatch) -> Optional[pa.Table]:
"""Process normal data (similar to base DataWriter)."""
if data is None or data.num_rows == 0:
- return pa.Table.from_batches([])
+ return None
return pa.Table.from_batches([data])
@staticmethod
@@ -316,30 +333,36 @@ class DataBlobWriter(DataWriter):
return current_size > self.target_file_size
def _close_current_writers(self):
- """Close both normal and blob writers and add blob metadata after
normal metadata (Java behavior)."""
- if self.pending_normal_data is None or
self.pending_normal_data.num_rows == 0:
- return
-
- # Close normal writer and get metadata
- normal_meta = self._write_normal_data_to_file(self.pending_normal_data)
+ """Close normal, blob, and vector writers; add metadata in order:
normal, blob, vector."""
+ normal_meta = None
+ if self.pending_normal_data is not None and
self.pending_normal_data.num_rows > 0:
+ normal_meta =
self._write_normal_data_to_file(self.pending_normal_data)
blob_metas = []
for blob_column in self.blob_file_column_names:
writer_metas = self.blob_writers[blob_column].prepare_commit()
- self._validate_consistency(normal_meta, writer_metas, blob_column)
+ if normal_meta is not None:
+ self._validate_consistency(normal_meta, writer_metas,
blob_column)
blob_metas.extend(writer_metas)
- # Add normal file metadata first
- self.committed_files.append(normal_meta)
+ vector_metas = []
+ if self.vector_writer is not None:
+ vector_metas = self.vector_writer.prepare_commit()
+ self.vector_writer.committed_files.clear()
+ if vector_metas and normal_meta is not None:
+ self._validate_consistency(normal_meta, vector_metas, 'vector')
- # Add blob file metadata after normal metadata
+ if normal_meta is not None:
+ self.committed_files.append(normal_meta)
self.committed_files.extend(blob_metas)
+ self.committed_files.extend(vector_metas)
- # Reset pending data
self.pending_normal_data = None
- logger.info(f"Closed both writers - normal: {normal_meta.file_name}, "
- f"added {len(blob_metas)} blob file metadata after normal
metadata")
+ if normal_meta is not None or blob_metas or vector_metas:
+ normal_name = normal_meta.file_name if normal_meta is not None
else '<none>'
+ logger.info(f"Closed writers - normal: {normal_name}, "
+ f"{len(blob_metas)} blob metas, {len(vector_metas)}
vector metas")
def _write_normal_data_to_file(self, data: pa.Table) ->
Optional[DataFileMeta]:
if data.num_rows == 0: