This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a2671e9c1a [python] Rename DataBlobWriter to DedicatedFormatWriter and 
support blob+vector splitting (#8027)
a2671e9c1a is described below

commit a2671e9c1ad7abb33ca51541060af24d0a9ea36e
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri May 29 11:42:21 2026 +0800

    [python] Rename DataBlobWriter to DedicatedFormatWriter and support 
blob+vector splitting (#8027)
    
    DataBlobWriter now handles normal + blob + vector columns (matching
    Java's DedicatedFormatRollingFileWriter), so rename it to
    DedicatedFormatWriter. Also add data evolution format tests covering
    parquet, blob, and vector paths.
---
 .../pypaimon/read/reader/format_vortex_reader.py   |   28 +-
 paimon-python/pypaimon/tests/blob_table_test.py    |  104 +-
 .../pypaimon/tests/data_evolution_formats_test.py  | 1063 ++++++++++++++++++++
 paimon-python/pypaimon/write/file_store_write.py   |    4 +-
 ...a_blob_writer.py => dedicated_format_writer.py} |  169 ++--
 5 files changed, 1224 insertions(+), 144 deletions(-)

diff --git a/paimon-python/pypaimon/read/reader/format_vortex_reader.py 
b/paimon-python/pypaimon/read/reader/format_vortex_reader.py
index ac48ca1abe..fcd50a24f0 100644
--- a/paimon-python/pypaimon/read/reader/format_vortex_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_vortex_reader.py
@@ -86,29 +86,23 @@ class FormatVortexReader(RecordBatchReader):
             PyarrowFieldParser.from_paimon_schema(read_fields) if read_fields 
else None
         )
 
-        # Collect predicate-referenced fields for targeted view type casting
-        self._cast_fields = predicate_fields if predicate_fields and 
vortex_expr is not None else set()
-
     @staticmethod
-    def _cast_view_types(batch: RecordBatch, target_fields: Set[str]) -> 
RecordBatch:
-        """Cast string_view/binary_view columns to string/binary, only for 
target fields."""
-        if not target_fields:
-            return batch
+    def _cast_view_types(batch: RecordBatch) -> RecordBatch:
+        """Cast all string_view/binary_view columns to string/binary."""
         columns = []
         fields = []
         changed = False
         for i in range(batch.num_columns):
             col = batch.column(i)
             field = batch.schema.field(i)
-            if field.name in target_fields:
-                if col.type == pa.string_view():
-                    col = col.cast(pa.utf8())
-                    field = field.with_type(pa.utf8())
-                    changed = True
-                elif col.type == pa.binary_view():
-                    col = col.cast(pa.binary())
-                    field = field.with_type(pa.binary())
-                    changed = True
+            if col.type == pa.string_view():
+                col = col.cast(pa.utf8())
+                field = field.with_type(pa.utf8())
+                changed = True
+            elif col.type == pa.binary_view():
+                col = col.cast(pa.binary())
+                field = field.with_type(pa.binary())
+                changed = True
             columns.append(col)
             fields.append(field)
         if changed:
@@ -118,7 +112,7 @@ class FormatVortexReader(RecordBatchReader):
     def read_arrow_batch(self) -> Optional[RecordBatch]:
         try:
             batch = next(self.record_batch_reader)
-            batch = self._cast_view_types(batch, self._cast_fields)
+            batch = self._cast_view_types(batch)
 
             if not self.missing_fields:
                 return batch
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 7261503450..042c47699a 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -29,8 +29,8 @@ from pypaimon.table.file_store_table import FileStoreTable
 from pypaimon.write.commit_message import CommitMessage
 
 
-class DataBlobWriterTest(unittest.TestCase):
-    """Tests for DataBlobWriter functionality with paimon table operations."""
+class DedicatedFormatWriterTest(unittest.TestCase):
+    """Tests for DedicatedFormatWriter functionality with paimon table 
operations."""
 
     @classmethod
     def setUpClass(cls):
@@ -51,8 +51,8 @@ class DataBlobWriterTest(unittest.TestCase):
         except OSError:
             pass
 
-    def test_data_blob_writer_basic_functionality(self):
-        """Test basic DataBlobWriter functionality with paimon table."""
+    def test_dedicated_format_writer_basic_functionality(self):
+        """Test basic DedicatedFormatWriter functionality with paimon table."""
         from pypaimon import Schema
 
         # Create schema with normal and blob columns
@@ -82,7 +82,7 @@ class DataBlobWriterTest(unittest.TestCase):
             'blob_data': [b'blob_data_1', b'blob_data_2', b'blob_data_3']
         }, schema=pa_schema)
 
-        # Test DataBlobWriter initialization using proper table API
+        # Test DedicatedFormatWriter initialization using proper table API
         # Use proper table API to create writer
         write_builder = table.new_batch_write_builder()
         blob_writer = write_builder.new_write()
@@ -108,8 +108,8 @@ class DataBlobWriterTest(unittest.TestCase):
 
         blob_writer.close()
 
-    def test_data_blob_writer_schema_detection(self):
-        """Test that DataBlobWriter correctly detects blob columns from 
schema."""
+    def test_dedicated_format_writer_schema_detection(self):
+        """Test that DedicatedFormatWriter correctly detects blob columns from 
schema."""
         from pypaimon import Schema
 
         # Test schema with blob column
@@ -132,7 +132,7 @@ class DataBlobWriterTest(unittest.TestCase):
         write_builder = table.new_batch_write_builder()
         blob_writer = write_builder.new_write()
 
-        # Test that DataBlobWriter was created internally
+        # Test that DedicatedFormatWriter was created internally
         # We can verify this by checking the internal data writers
         test_data = pa.Table.from_pydict({
             'id': [1, 2, 3],
@@ -142,19 +142,19 @@ class DataBlobWriterTest(unittest.TestCase):
         # Write data to trigger writer creation
         blob_writer.write_arrow(test_data)
 
-        # Verify that a DataBlobWriter was created internally
+        # Verify that a DedicatedFormatWriter was created internally
         data_writers = blob_writer.file_store_write.data_writers
         self.assertGreater(len(data_writers), 0)
 
-        # Check that the writer is a DataBlobWriter
+        # Check that the writer is a DedicatedFormatWriter
         for writer in data_writers.values():
-            from pypaimon.write.writer.data_blob_writer import DataBlobWriter
-            self.assertIsInstance(writer, DataBlobWriter)
+            from pypaimon.write.writer.dedicated_format_writer import 
DedicatedFormatWriter
+            self.assertIsInstance(writer, DedicatedFormatWriter)
 
         blob_writer.close()
 
-    def test_data_blob_writer_no_blob_column(self):
-        """Test that DataBlobWriter raises error when no blob column is 
found."""
+    def test_dedicated_format_writer_no_blob_column(self):
+        """Test that DedicatedFormatWriter raises error when no blob column is 
found."""
         from pypaimon import Schema
 
         # Test schema without blob column
@@ -177,7 +177,7 @@ class DataBlobWriterTest(unittest.TestCase):
         write_builder = table.new_batch_write_builder()
         writer = write_builder.new_write()
 
-        # Test that a regular writer (not DataBlobWriter) was created
+        # Test that a regular writer (not DedicatedFormatWriter) was created
         test_data = pa.Table.from_pydict({
             'id': [1, 2, 3],
             'name': ['Alice', 'Bob', 'Charlie']
@@ -186,19 +186,19 @@ class DataBlobWriterTest(unittest.TestCase):
         # Write data to trigger writer creation
         writer.write_arrow(test_data)
 
-        # Verify that a regular writer was created (not DataBlobWriter)
+        # Verify that a regular writer was created (not DedicatedFormatWriter)
         data_writers = writer.file_store_write.data_writers
         self.assertGreater(len(data_writers), 0)
 
-        # Check that the writer is NOT a DataBlobWriter
+        # Check that the writer is NOT a DedicatedFormatWriter
         for writer_instance in data_writers.values():
-            from pypaimon.write.writer.data_blob_writer import DataBlobWriter
-            self.assertNotIsInstance(writer_instance, DataBlobWriter)
+            from pypaimon.write.writer.dedicated_format_writer import 
DedicatedFormatWriter
+            self.assertNotIsInstance(writer_instance, DedicatedFormatWriter)
 
         writer.close()
 
-    def test_data_blob_writer_multiple_blob_columns(self):
-        """Test that DataBlobWriter supports multiple blob columns."""
+    def test_dedicated_format_writer_multiple_blob_columns(self):
+        """Test that DedicatedFormatWriter supports multiple blob columns."""
         from pypaimon import Schema
 
         # Test schema with multiple blob columns
@@ -242,7 +242,7 @@ class DataBlobWriterTest(unittest.TestCase):
         result = 
table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
         self.assertEqual(result.num_rows, 3)
 
-    def test_data_blob_writer_partial_write_with_write_type(self):
+    def test_dedicated_format_writer_partial_write_with_write_type(self):
         """Partial write (normal + blob subset) via with_write_type: split 
must match batch columns."""
         from pypaimon import Schema
 
@@ -292,7 +292,7 @@ class DataBlobWriterTest(unittest.TestCase):
         self.assertEqual(out.column('blob_data').to_pylist(), [b'a', b'b'])
         self.assertEqual(out.column('name').to_pylist(), [None, None])
 
-    def test_data_blob_writer_partial_write_normal_only_with_write_type(self):
+    def 
test_dedicated_format_writer_partial_write_normal_only_with_write_type(self):
         """Partial write without blob columns in write_cols must not touch 
blob split paths."""
         from pypaimon import Schema
 
@@ -333,7 +333,7 @@ class DataBlobWriterTest(unittest.TestCase):
         self.assertEqual(out.column('name').to_pylist(), ['n'])
         self.assertEqual(out.column('blob_data').to_pylist(), [None])
 
-    def 
test_data_blob_writer_partial_write_single_blob_of_two_with_write_type(self):
+    def 
test_dedicated_format_writer_partial_write_single_blob_of_two_with_write_type(self):
         """with_write_type lists only one blob column: only that column gets 
.blob files."""
         from pypaimon import Schema
 
@@ -369,8 +369,8 @@ class DataBlobWriterTest(unittest.TestCase):
         write_builder.new_commit().commit(commit_messages)
         writer.close()
 
-    def test_data_blob_writer_write_operations(self):
-        """Test DataBlobWriter write operations with real data."""
+    def test_dedicated_format_writer_write_operations(self):
+        """Test DedicatedFormatWriter write operations with real data."""
         from pypaimon import Schema
 
         # Create schema with blob column
@@ -411,8 +411,8 @@ class DataBlobWriterTest(unittest.TestCase):
 
         blob_writer.close()
 
-    def test_data_blob_writer_write_large_blob(self):
-        """Test DataBlobWriter with large blob data (5MB per item) in 10 
batches."""
+    def test_dedicated_format_writer_write_large_blob(self):
+        """Test DedicatedFormatWriter with very large blob data (50MB per 
item) in 10 batches."""
         from pypaimon import Schema
 
         # Create schema with blob column
@@ -467,7 +467,7 @@ class DataBlobWriterTest(unittest.TestCase):
             # Log progress for large data processing
             print(f"Completed batch {batch_num + 1}/10 with {batch.num_rows} 
rows")
 
-        # Record count is tracked internally by DataBlobWriter
+        # Record count is tracked internally by DedicatedFormatWriter
 
         # Test prepare commit
         commit_messages: CommitMessage = blob_writer.prepare_commit()
@@ -511,8 +511,8 @@ class DataBlobWriterTest(unittest.TestCase):
 
         blob_writer.close()
 
-    def test_data_blob_writer_abort_functionality(self):
-        """Test DataBlobWriter abort functionality."""
+    def test_dedicated_format_writer_abort_functionality(self):
+        """Test DedicatedFormatWriter abort functionality."""
         from pypaimon import Schema
 
         # Create schema with blob column
@@ -546,12 +546,12 @@ class DataBlobWriterTest(unittest.TestCase):
             blob_writer.write_arrow_batch(batch)
 
         # Test abort - BatchTableWrite doesn't have abort method
-        # The abort functionality is handled internally by DataBlobWriter
+        # The abort functionality is handled internally by 
DedicatedFormatWriter
 
         blob_writer.close()
 
-    def test_data_blob_writer_multiple_batches(self):
-        """Test DataBlobWriter with multiple batches and verify results."""
+    def test_dedicated_format_writer_multiple_batches(self):
+        """Test DedicatedFormatWriter with multiple batches and verify 
results."""
         from pypaimon import Schema
 
         # Create schema with blob column
@@ -608,7 +608,7 @@ class DataBlobWriterTest(unittest.TestCase):
             blob_writer.write_arrow_batch(batch)
             total_rows += batch.num_rows
 
-        # Record count is tracked internally by DataBlobWriter
+        # Record count is tracked internally by DedicatedFormatWriter
 
         # Test prepare commit
         commit_messages = blob_writer.prepare_commit()
@@ -619,8 +619,8 @@ class DataBlobWriterTest(unittest.TestCase):
 
         blob_writer.close()
 
-    def test_data_blob_writer_large_batches(self):
-        """Test DataBlobWriter with large batches to test rolling behavior."""
+    def test_dedicated_format_writer_large_batches(self):
+        """Test DedicatedFormatWriter with large batches to test rolling 
behavior."""
         from pypaimon import Schema
 
         # Create schema with blob column
@@ -671,7 +671,7 @@ class DataBlobWriterTest(unittest.TestCase):
             blob_writer.write_arrow_batch(batch)
             total_rows += batch.num_rows
 
-        # Record count is tracked internally by DataBlobWriter
+        # Record count is tracked internally by DedicatedFormatWriter
 
         # Test prepare commit
         commit_messages = blob_writer.prepare_commit()
@@ -682,8 +682,8 @@ class DataBlobWriterTest(unittest.TestCase):
 
         blob_writer.close()
 
-    def test_data_blob_writer_mixed_data_types(self):
-        """Test DataBlobWriter with mixed data types in blob column."""
+    def test_dedicated_format_writer_mixed_data_types(self):
+        """Test DedicatedFormatWriter with mixed data types in blob column."""
         from pypaimon import Schema
 
         # Create schema with blob column
@@ -726,7 +726,7 @@ class DataBlobWriterTest(unittest.TestCase):
             blob_writer.write_arrow_batch(batch)
             total_rows += batch.num_rows
 
-        # Record count is tracked internally by DataBlobWriter
+        # Record count is tracked internally by DedicatedFormatWriter
 
         # Test prepare commit
         commit_messages = blob_writer.prepare_commit()
@@ -786,8 +786,8 @@ class DataBlobWriterTest(unittest.TestCase):
             self.assertEqual(result_type, original_type, f"Row {i + 1}: Type 
should match")
             self.assertEqual(result_data, original_data, f"Row {i + 1}: Blob 
data should match")
 
-    def test_data_blob_writer_empty_batches(self):
-        """Test DataBlobWriter with empty batches."""
+    def test_dedicated_format_writer_empty_batches(self):
+        """Test DedicatedFormatWriter with empty batches."""
         from pypaimon import Schema
 
         # Create schema with blob column
@@ -842,8 +842,8 @@ class DataBlobWriterTest(unittest.TestCase):
             total_rows += batch.num_rows
 
         # Verify record count (empty batch should not affect count)
-        # Record count is tracked internally by DataBlobWriter
-        # Record count is tracked internally by DataBlobWriter
+        # Record count is tracked internally by DedicatedFormatWriter
+        # Record count is tracked internally by DedicatedFormatWriter
 
         # Test prepare commit
         commit_messages = blob_writer.prepare_commit()
@@ -851,8 +851,8 @@ class DataBlobWriterTest(unittest.TestCase):
 
         blob_writer.close()
 
-    def test_data_blob_writer_rolling_behavior(self):
-        """Test DataBlobWriter rolling behavior with multiple commits."""
+    def test_dedicated_format_writer_rolling_behavior(self):
+        """Test DedicatedFormatWriter rolling behavior with multiple 
commits."""
         from pypaimon import Schema
 
         # Create schema with blob column
@@ -892,7 +892,7 @@ class DataBlobWriterTest(unittest.TestCase):
                 blob_writer.write_arrow_batch(batch)
 
         # Verify total record count
-        # Record count is tracked internally by DataBlobWriter
+        # Record count is tracked internally by DedicatedFormatWriter
 
         # Test prepare commit
         commit_messages = blob_writer.prepare_commit()
@@ -2714,8 +2714,8 @@ class DataBlobWriterTest(unittest.TestCase):
         actual = pa.concat_tables([actual1, actual2, actual3]).sort_by('id')
         self.assertEqual(actual, expected)
 
-    def test_data_blob_writer_with_slice(self):
-        """Test DataBlobWriter with mixed data types in blob column."""
+    def test_dedicated_format_writer_with_slice(self):
+        """Test DedicatedFormatWriter with mixed data types in blob column."""
 
         # Create schema with blob column
         pa_schema = pa.schema([
@@ -2776,8 +2776,8 @@ class DataBlobWriterTest(unittest.TestCase):
         self.assertEqual(result.num_columns, 3, "Should have 3 columns")
         self.assertEqual(result["id"].unique().to_pylist(), [2, 3], "Get 
incorrect column ID")
 
-    def test_data_blob_writer_with_shard(self):
-        """Test DataBlobWriter with mixed data types in blob column."""
+    def test_dedicated_format_writer_with_shard(self):
+        """Test DedicatedFormatWriter with mixed data types in blob column."""
 
         # Create schema with blob column
         pa_schema = pa.schema([
diff --git a/paimon-python/pypaimon/tests/data_evolution_formats_test.py 
b/paimon-python/pypaimon/tests/data_evolution_formats_test.py
new file mode 100644
index 0000000000..f89f3290f1
--- /dev/null
+++ b/paimon-python/pypaimon/tests/data_evolution_formats_test.py
@@ -0,0 +1,1063 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Data evolution tests covering parquet + blob + vector (vortex) formats.
+
+Each test writes data using different file format combinations and reads it
+back, verifying correctness of the data evolution merge path across formats.
+"""
+
+import os
+import shutil
+import sys
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+
+
+class DataEvolutionFormatsTest(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', False)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    # ------------------------------------------------------------------
+    # Parquet-format data evolution
+    # ------------------------------------------------------------------
+
+    def test_parquet_column_subset_write_and_merge_read(self):
+        """Write disjoint column subsets as parquet, merge-read via data 
evolution."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('score', pa.float64()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'file.format': 'parquet',
+        })
+        self.catalog.create_table('default.fmt_parquet_subset', schema, False)
+        table = self.catalog.get_table('default.fmt_parquet_subset')
+        wb = table.new_batch_write_builder()
+
+        # commit 1: write id + name
+        w0 = wb.new_write().with_write_type(['id', 'name'])
+        w1 = wb.new_write().with_write_type(['score'])
+        c = wb.new_commit()
+        w0.write_arrow(pa.Table.from_pydict(
+            {'id': [1, 2, 3], 'name': ['a', 'b', 'c']},
+            schema=pa.schema([('id', pa.int32()), ('name', pa.string())])))
+        w1.write_arrow(pa.Table.from_pydict(
+            {'score': [1.1, 2.2, 3.3]},
+            schema=pa.schema([('score', pa.float64())])))
+        cmts = w0.prepare_commit() + w1.prepare_commit()
+        for m in cmts:
+            for nf in m.new_files:
+                nf.first_row_id = 0
+        c.commit(cmts)
+        w0.close()
+        w1.close()
+        c.close()
+
+        # verify file format
+        all_files = [nf for m in cmts for nf in m.new_files]
+        for f in all_files:
+            self.assertTrue(f.file_name.endswith('.parquet'),
+                            f"Expected parquet file, got {f.file_name}")
+
+        # read back
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        expect = pa.Table.from_pydict(
+            {'id': [1, 2, 3], 'name': ['a', 'b', 'c'], 'score': [1.1, 2.2, 
3.3]},
+            schema=pa_schema)
+        self.assertEqual(actual, expect)
+
+    def test_parquet_overwrite_column(self):
+        """Write all columns, then overwrite one column via a second commit."""
+        pa_schema = pa.schema([
+            ('k', pa.int64()),
+            ('v', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'file.format': 'parquet',
+        })
+        self.catalog.create_table('default.fmt_parquet_overwrite', schema, 
False)
+        table = self.catalog.get_table('default.fmt_parquet_overwrite')
+        wb = table.new_batch_write_builder()
+
+        # commit 1: full row
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict(
+            {'k': [10, 20], 'v': ['old1', 'old2']}, schema=pa_schema))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        # commit 2: overwrite v only (first_row_id=0)
+        tw = wb.new_write().with_write_type(['v'])
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict(
+            {'v': ['new1', 'new2']}, schema=pa.schema([('v', pa.string())])))
+        cmts = tw.prepare_commit()
+        for m in cmts:
+            for nf in m.new_files:
+                nf.first_row_id = 0
+        tc.commit(cmts)
+        tw.close()
+        tc.close()
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        expect = pa.Table.from_pydict(
+            {'k': [10, 20], 'v': ['new1', 'new2']}, schema=pa_schema)
+        self.assertEqual(actual, expect)
+
+    def test_parquet_append_new_rows(self):
+        """Append new rows (new first_row_id) with column subsets, merge-read 
all."""
+        pa_schema = pa.schema([
+            ('a', pa.int32()),
+            ('b', pa.string()),
+            ('c', pa.float32()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'file.format': 'parquet',
+        })
+        self.catalog.create_table('default.fmt_parquet_append', schema, False)
+        table = self.catalog.get_table('default.fmt_parquet_append')
+        wb = table.new_batch_write_builder()
+
+        # commit 1: 2 full rows
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict(
+            {'a': [1, 2], 'b': ['x', 'y'], 'c': [0.1, 0.2]}, schema=pa_schema))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        # commit 2: append 2 new rows with column subsets, first_row_id=2
+        w_ab = wb.new_write().with_write_type(['a', 'b'])
+        w_c = wb.new_write().with_write_type(['c'])
+        tc = wb.new_commit()
+        w_ab.write_arrow(pa.Table.from_pydict(
+            {'a': [3, 4], 'b': ['z', 'w']},
+            schema=pa.schema([('a', pa.int32()), ('b', pa.string())])))
+        w_c.write_arrow(pa.Table.from_pydict(
+            {'c': [0.3, 0.4]},
+            schema=pa.schema([('c', pa.float32())])))
+        cmts = w_ab.prepare_commit() + w_c.prepare_commit()
+        for m in cmts:
+            for nf in m.new_files:
+                nf.first_row_id = 2
+        tc.commit(cmts)
+        w_ab.close()
+        w_c.close()
+        tc.close()
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 4)
+        expect = pa.Table.from_pydict(
+            {'a': [1, 2, 3, 4], 'b': ['x', 'y', 'z', 'w'],
+             'c': [0.1, 0.2, 0.3, 0.4]},
+            schema=pa_schema)
+        self.assertEqual(actual, expect)
+
+    # ------------------------------------------------------------------
+    # Blob-format data evolution
+    # ------------------------------------------------------------------
+
+    def test_blob_write_and_read(self):
+        """Write a table with normal + blob columns, read back and verify."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('payload', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        self.catalog.create_table('default.fmt_blob_basic', schema, False)
+        table = self.catalog.get_table('default.fmt_blob_basic')
+        wb = table.new_batch_write_builder()
+
+        blobs = [b'hello world', b'\x00\x01\x02\xff', b'paimon blob']
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict(
+            {'id': [1, 2, 3], 'payload': blobs}, schema=pa_schema))
+        cmts = tw.prepare_commit()
+        tc.commit(cmts)
+        tw.close()
+        tc.close()
+
+        # verify we produced both parquet and blob files
+        all_files = [nf for m in cmts for nf in m.new_files]
+        parquet_files = [f for f in all_files if 
f.file_name.endswith('.parquet')]
+        blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+        self.assertGreater(len(parquet_files), 0)
+        self.assertGreater(len(blob_files), 0)
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 3)
+        self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+        self.assertEqual(actual.column('payload').to_pylist(), blobs)
+
+    def test_blob_column_subset_evolution(self):
+        """Write normal+blob cols in one commit, overwrite normal col in 
another, merge-read."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('doc', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        self.catalog.create_table('default.fmt_blob_evolution', schema, False)
+        table = self.catalog.get_table('default.fmt_blob_evolution')
+        wb = table.new_batch_write_builder()
+
+        # commit 1: write id + doc (normal + blob together)
+        tw = wb.new_write().with_write_type(['id', 'doc'])
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict(
+            {'id': [1, 2], 'doc': [b'doc_alice', b'doc_bob']},
+            schema=pa.schema([('id', pa.int32()), ('doc', 
pa.large_binary())])))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        # commit 2: write name for the same rows (first_row_id=0)
+        tw = wb.new_write().with_write_type(['name'])
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict(
+            {'name': ['Alice', 'Bob']},
+            schema=pa.schema([('name', pa.string())])))
+        cmts = tw.prepare_commit()
+        for m in cmts:
+            for nf in m.new_files:
+                nf.first_row_id = 0
+        tc.commit(cmts)
+        tw.close()
+        tc.close()
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 2)
+        self.assertEqual(actual.column('id').to_pylist(), [1, 2])
+        self.assertEqual(actual.column('name').to_pylist(), ['Alice', 'Bob'])
+        self.assertEqual(actual.column('doc').to_pylist(), [b'doc_alice', 
b'doc_bob'])
+
+    def test_blob_append_with_subset_evolution(self):
+        """Write normal+blob subset in first commit, add remaining col via 
evolution."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('tag', pa.string()),
+            ('picture', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        self.catalog.create_table('default.fmt_blob_append_evo', schema, False)
+        table = self.catalog.get_table('default.fmt_blob_append_evo')
+        wb = table.new_batch_write_builder()
+
+        # commit 1: id + picture (normal + blob)
+        tw = wb.new_write().with_write_type(['id', 'picture'])
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict(
+            {'id': [1, 2], 'picture': [b'pic1', b'pic2']},
+            schema=pa.schema([('id', pa.int32()), ('picture', 
pa.large_binary())])))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        # commit 2: add tag for the same rows
+        tw = wb.new_write().with_write_type(['tag'])
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict(
+            {'tag': ['t1', 't2']},
+            schema=pa.schema([('tag', pa.string())])))
+        cmts = tw.prepare_commit()
+        for m in cmts:
+            for nf in m.new_files:
+                nf.first_row_id = 0
+        tc.commit(cmts)
+        tw.close()
+        tc.close()
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 2)
+        self.assertEqual(actual.column('id').to_pylist(), [1, 2])
+        self.assertEqual(actual.column('tag').to_pylist(), ['t1', 't2'])
+        self.assertEqual(actual.column('picture').to_pylist(), [b'pic1', 
b'pic2'])
+
+    def test_blob_multiple_blob_columns(self):
+        """Table with two blob columns, write and read both."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('audio', pa.large_binary()),
+            ('video', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        self.catalog.create_table('default.fmt_blob_multi', schema, False)
+        table = self.catalog.get_table('default.fmt_blob_multi')
+        wb = table.new_batch_write_builder()
+
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict({
+            'id': [1, 2],
+            'audio': [b'audio_1', b'audio_2'],
+            'video': [b'video_1', b'video_2'],
+        }, schema=pa_schema))
+        cmts = tw.prepare_commit()
+        tc.commit(cmts)
+        tw.close()
+        tc.close()
+
+        # verify blob files were produced
+        all_files = [nf for m in cmts for nf in m.new_files]
+        blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+        self.assertGreaterEqual(len(blob_files), 2)
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 2)
+        self.assertEqual(actual.column('audio').to_pylist(), [b'audio_1', 
b'audio_2'])
+        self.assertEqual(actual.column('video').to_pylist(), [b'video_1', 
b'video_2'])
+
+    # ------------------------------------------------------------------
+    # Vortex-format data evolution
+    # ------------------------------------------------------------------
+
+    @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
+    @unittest.skipUnless(
+        __import__('importlib').util.find_spec('vortex') is not None,
+        "vortex not installed")
+    def test_vortex_column_subset_write_and_merge_read(self):
+        """Write disjoint column subsets as vortex, merge-read via data 
evolution."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('tag', pa.string()),
+            ('val', pa.float64()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'file.format': 'vortex',
+        })
+        self.catalog.create_table('default.fmt_vortex_subset', schema, False)
+        table = self.catalog.get_table('default.fmt_vortex_subset')
+        wb = table.new_batch_write_builder()
+
+        w0 = wb.new_write().with_write_type(['id', 'tag'])
+        w1 = wb.new_write().with_write_type(['val'])
+        c = wb.new_commit()
+        w0.write_arrow(pa.Table.from_pydict(
+            {'id': [10, 20, 30], 'tag': ['p', 'q', 'r']},
+            schema=pa.schema([('id', pa.int32()), ('tag', pa.string())])))
+        w1.write_arrow(pa.Table.from_pydict(
+            {'val': [1.5, 2.5, 3.5]},
+            schema=pa.schema([('val', pa.float64())])))
+        cmts = w0.prepare_commit() + w1.prepare_commit()
+        for m in cmts:
+            for nf in m.new_files:
+                nf.first_row_id = 0
+        c.commit(cmts)
+        w0.close()
+        w1.close()
+        c.close()
+
+        # verify vortex files
+        all_files = [nf for m in cmts for nf in m.new_files]
+        for f in all_files:
+            self.assertTrue(f.file_name.endswith('.vortex'),
+                            f"Expected vortex file, got {f.file_name}")
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        expect = pa.Table.from_pydict(
+            {'id': [10, 20, 30], 'tag': ['p', 'q', 'r'], 'val': [1.5, 2.5, 
3.5]},
+            schema=pa_schema)
+        self.assertEqual(actual, expect)
+
+    @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
+    @unittest.skipUnless(
+        __import__('importlib').util.find_spec('vortex') is not None,
+        "vortex not installed")
+    def test_vortex_overwrite_column(self):
+        """Full row write then overwrite one column, all in vortex format."""
+        pa_schema = pa.schema([
+            ('k', pa.int64()),
+            ('v', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'file.format': 'vortex',
+        })
+        self.catalog.create_table('default.fmt_vortex_overwrite', schema, 
False)
+        table = self.catalog.get_table('default.fmt_vortex_overwrite')
+        wb = table.new_batch_write_builder()
+
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict(
+            {'k': [100, 200], 'v': ['old', 'old']}, schema=pa_schema))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        tw = wb.new_write().with_write_type(['v'])
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict(
+            {'v': ['new', 'new']}, schema=pa.schema([('v', pa.string())])))
+        cmts = tw.prepare_commit()
+        for m in cmts:
+            for nf in m.new_files:
+                nf.first_row_id = 0
+        tc.commit(cmts)
+        tw.close()
+        tc.close()
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual, pa.Table.from_pydict(
+            {'k': [100, 200], 'v': ['new', 'new']}, schema=pa_schema))
+
+    @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
+    @unittest.skipUnless(
+        __import__('importlib').util.find_spec('vortex') is not None,
+        "vortex not installed")
+    def test_vortex_append_new_rows(self):
+        """Append new rows with column subsets in vortex format."""
+        pa_schema = pa.schema([
+            ('x', pa.int32()),
+            ('y', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'file.format': 'vortex',
+        })
+        self.catalog.create_table('default.fmt_vortex_append', schema, False)
+        table = self.catalog.get_table('default.fmt_vortex_append')
+        wb = table.new_batch_write_builder()
+
+        # commit 1
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict(
+            {'x': [1, 2], 'y': ['a', 'b']}, schema=pa_schema))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        # commit 2: append with subsets, first_row_id=2
+        w_x = wb.new_write().with_write_type(['x'])
+        w_y = wb.new_write().with_write_type(['y'])
+        tc = wb.new_commit()
+        w_x.write_arrow(pa.Table.from_pydict(
+            {'x': [3]}, schema=pa.schema([('x', pa.int32())])))
+        w_y.write_arrow(pa.Table.from_pydict(
+            {'y': ['c']}, schema=pa.schema([('y', pa.string())])))
+        cmts = w_x.prepare_commit() + w_y.prepare_commit()
+        for m in cmts:
+            for nf in m.new_files:
+                nf.first_row_id = 2
+        tc.commit(cmts)
+        w_x.close()
+        w_y.close()
+        tc.close()
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 3)
+        expect = pa.Table.from_pydict(
+            {'x': [1, 2, 3], 'y': ['a', 'b', 'c']}, schema=pa_schema)
+        self.assertEqual(actual, expect)
+
+    @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
+    @unittest.skipUnless(
+        __import__('importlib').util.find_spec('vortex') is not None,
+        "vortex not installed")
+    def test_vortex_with_row_id_and_filter(self):
+        """Write vortex data, read with _ROW_ID projection and filter."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('val', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'file.format': 'vortex',
+        })
+        self.catalog.create_table('default.fmt_vortex_rowid_filter', schema, 
False)
+        table = self.catalog.get_table('default.fmt_vortex_rowid_filter')
+        wb = table.new_batch_write_builder()
+
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict(
+            {'id': list(range(10)), 'val': [f'v{i}' for i in range(10)]},
+            schema=pa_schema))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        # full read
+        rb = table.new_read_builder()
+        full = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(full.num_rows, 10)
+
+        # filter by _ROW_ID
+        rb_rid = table.new_read_builder().with_projection(['id', 'val', 
'_ROW_ID'])
+        pb = rb_rid.new_predicate_builder()
+        rb_f = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 5))
+        actual = rb_f.new_read().to_arrow(rb_f.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 1)
+        self.assertEqual(actual.column('id')[0].as_py(), 5)
+        self.assertEqual(actual.column('val')[0].as_py(), 'v5')
+
+    # ------------------------------------------------------------------
+    # Vector (vortex) file format for embedding columns
+    # ------------------------------------------------------------------
+
+    @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
+    @unittest.skipUnless(
+        __import__('importlib').util.find_spec('vortex') is not None,
+        "vortex not installed")
+    def test_vector_vortex_write_and_read(self):
+        """Write table with normal + vector columns using vortex vector 
format."""
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('embed', pa.list_(pa.float32(), 4)),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'file.format': 'vortex',
+            'vector.file.format': 'vortex',
+        })
+        self.catalog.create_table('default.fmt_vec_vortex', schema, False)
+        table = self.catalog.get_table('default.fmt_vec_vortex')
+
+        embeddings = [1.0, 0.0, 0.0, 0.0,
+                      0.0, 1.0, 0.0, 0.0,
+                      0.0, 0.0, 1.0, 0.0]
+        test_data = pa.table({
+            'id': pa.array([1, 2, 3], type=pa.int64()),
+            'embed': pa.FixedSizeListArray.from_arrays(
+                pa.array(embeddings, type=pa.float32()), 4),
+        })
+
+        wb = table.new_batch_write_builder()
+        tw = wb.new_write()
+        tw.write_arrow(test_data)
+        cmts = tw.prepare_commit()
+
+        # should produce both normal and vector files
+        all_files = [nf for m in cmts for nf in m.new_files]
+        normal_files = [f for f in all_files if not 
DataFileMeta.is_vector_file(f.file_name)]
+        vector_files = [f for f in all_files if 
DataFileMeta.is_vector_file(f.file_name)]
+        self.assertGreater(len(normal_files), 0)
+        self.assertGreater(len(vector_files), 0)
+        for vf in vector_files:
+            self.assertIn('.vector.vortex', vf.file_name)
+
+        wb.new_commit().commit(cmts)
+        tw.close()
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 3)
+        self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+        embed_col = actual.column('embed')
+        self.assertEqual(embed_col[0].as_py(), [1.0, 0.0, 0.0, 0.0])
+        self.assertEqual(embed_col[1].as_py(), [0.0, 1.0, 0.0, 0.0])
+        self.assertEqual(embed_col[2].as_py(), [0.0, 0.0, 1.0, 0.0])
+
+    @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
+    @unittest.skipUnless(
+        __import__('importlib').util.find_spec('vortex') is not None,
+        "vortex not installed")
+    def test_vector_vortex_multiple_appends(self):
+        """Append multiple batches of normal+vector data and read all back."""
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('label', pa.string()),
+            ('embed', pa.list_(pa.float32(), 3)),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'file.format': 'vortex',
+            'vector.file.format': 'vortex',
+        })
+        self.catalog.create_table('default.fmt_vec_vortex_append', schema, 
False)
+        table = self.catalog.get_table('default.fmt_vec_vortex_append')
+        wb = table.new_batch_write_builder()
+
+        # commit 1
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.table({
+            'id': pa.array([1, 2], type=pa.int64()),
+            'label': pa.array(['cat', 'dog']),
+            'embed': pa.FixedSizeListArray.from_arrays(
+                pa.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6], type=pa.float32()), 
3),
+        }))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        # commit 2: append
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.table({
+            'id': pa.array([3], type=pa.int64()),
+            'label': pa.array(['bird']),
+            'embed': pa.FixedSizeListArray.from_arrays(
+                pa.array([0.7, 0.8, 0.9], type=pa.float32()), 3),
+        }))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 3)
+        self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+        self.assertEqual(actual.column('label').to_pylist(), ['cat', 'dog', 
'bird'])
+        embed_col = actual.column('embed')
+        self.assertAlmostEqual(embed_col[0].as_py()[0], 0.1, places=5)
+        self.assertAlmostEqual(embed_col[2].as_py()[2], 0.9, places=5)
+
+    # ------------------------------------------------------------------
+    # Mixed formats: parquet + blob + vector in one table
+    # ------------------------------------------------------------------
+
+    def test_parquet_and_blob_mixed_append(self):
+        """Table with normal parquet cols + blob col, append new rows."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('name', pa.string()),
+            ('image', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        self.catalog.create_table('default.fmt_mixed_parquet_blob', schema, 
False)
+        table = self.catalog.get_table('default.fmt_mixed_parquet_blob')
+        wb = table.new_batch_write_builder()
+
+        # commit 1: first batch
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict({
+            'id': [1, 2],
+            'name': ['a', 'b'],
+            'image': [b'img1', b'img2'],
+        }, schema=pa_schema))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        # commit 2: append more rows
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict({
+            'id': [3, 4],
+            'name': ['c', 'd'],
+            'image': [b'img3', b'img4'],
+        }, schema=pa_schema))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 4)
+        self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3, 4])
+        self.assertEqual(actual.column('name').to_pylist(), ['a', 'b', 'c', 
'd'])
+        self.assertEqual(actual.column('image').to_pylist(),
+                         [b'img1', b'img2', b'img3', b'img4'])
+
+    @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
+    @unittest.skipUnless(
+        __import__('importlib').util.find_spec('vortex') is not None,
+        "vortex not installed")
+    def test_vortex_and_vector_vortex_mixed(self):
+        """Table with normal (vortex) + vector (vortex) columns, write and 
read.
+
+        Verifies that the writer produces separate .vortex and .vector.vortex 
files,
+        and the data evolution merge reader stitches them back together.
+        """
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('name', pa.string()),
+            ('embed', pa.list_(pa.float32(), 3)),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'file.format': 'vortex',
+            'vector.file.format': 'vortex',
+        })
+        self.catalog.create_table('default.fmt_vortex_vector', schema, False)
+        table = self.catalog.get_table('default.fmt_vortex_vector')
+        wb = table.new_batch_write_builder()
+
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.table({
+            'id': pa.array([1, 2, 3], type=pa.int64()),
+            'name': pa.array(['cat', 'dog', 'bird']),
+            'embed': pa.FixedSizeListArray.from_arrays(
+                pa.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9],
+                         type=pa.float32()), 3),
+        }))
+        cmts = tw.prepare_commit()
+        tc.commit(cmts)
+        tw.close()
+        tc.close()
+
+        # verify two file types: .vortex + .vector.vortex
+        all_files = [nf for m in cmts for nf in m.new_files]
+        normal_files = [f for f in all_files if not 
DataFileMeta.is_vector_file(f.file_name)]
+        vector_files = [f for f in all_files if 
DataFileMeta.is_vector_file(f.file_name)]
+        self.assertGreater(len(normal_files), 0, "should produce normal vortex 
files")
+        self.assertGreater(len(vector_files), 0, "should produce vector files")
+        for nf in normal_files:
+            self.assertTrue(nf.file_name.endswith('.vortex'))
+        for vf in vector_files:
+            self.assertIn('.vector.vortex', vf.file_name)
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 3)
+        self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+        self.assertEqual(actual.column('name').to_pylist(), ['cat', 'dog', 
'bird'])
+        embed = actual.column('embed')
+        self.assertAlmostEqual(embed[0].as_py()[0], 0.1, places=5)
+        self.assertAlmostEqual(embed[2].as_py()[2], 0.9, places=5)
+
+    def test_blob_and_vector_inline_mixed(self):
+        """Table with normal + blob + vector(inline) columns, write and read.
+
+        When blob columns are present, vector columns are stored inline in the
+        parquet file (not as separate .vector files). This test verifies the
+        blob+inline-vector path works correctly.
+        """
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('doc', pa.large_binary()),
+            ('embed', pa.list_(pa.float32(), 3)),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        self.catalog.create_table('default.fmt_blob_vector_inline', schema, 
False)
+        table = self.catalog.get_table('default.fmt_blob_vector_inline')
+        wb = table.new_batch_write_builder()
+
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.table({
+            'id': pa.array([1, 2], type=pa.int64()),
+            'doc': pa.array([b'doc1', b'doc2'], type=pa.large_binary()),
+            'embed': pa.FixedSizeListArray.from_arrays(
+                pa.array([0.1, 0.2, 0.3, 0.4, 0.5, 0.6], type=pa.float32()), 
3),
+        }))
+        cmts = tw.prepare_commit()
+        tc.commit(cmts)
+        tw.close()
+        tc.close()
+
+        # verify parquet + blob files
+        all_files = [nf for m in cmts for nf in m.new_files]
+        parquet_files = [f for f in all_files if 
f.file_name.endswith('.parquet')]
+        blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+        self.assertGreater(len(parquet_files), 0, "should produce parquet 
files")
+        self.assertGreater(len(blob_files), 0, "should produce blob files")
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 2)
+        self.assertEqual(actual.column('id').to_pylist(), [1, 2])
+        self.assertEqual(actual.column('doc').to_pylist(), [b'doc1', b'doc2'])
+        embed = actual.column('embed')
+        self.assertAlmostEqual(embed[0].as_py()[0], 0.1, places=5)
+        self.assertAlmostEqual(embed[1].as_py()[2], 0.6, places=5)
+
+    def test_blob_and_vector_with_vector_file_format(self):
+        """Table with blob + vector columns and explicit vector.file.format.
+
+        DedicatedFormatWriter splits data three ways: normal columns to 
.parquet,
+        blob columns to .blob, and vector columns to .vector.<format> files.
+        """
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('doc', pa.large_binary()),
+            ('embed', pa.list_(pa.float32(), 3)),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'vector.file.format': 'parquet',
+        })
+        self.catalog.create_table('default.fmt_blob_vec_format', schema, False)
+        table = self.catalog.get_table('default.fmt_blob_vec_format')
+        wb = table.new_batch_write_builder()
+
+        # commit 1: write all columns
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.table({
+            'id': pa.array([1, 2, 3], type=pa.int64()),
+            'doc': pa.array([b'aaa', b'bbb', b'ccc'], type=pa.large_binary()),
+            'embed': pa.FixedSizeListArray.from_arrays(
+                pa.array([1.0, 0.0, 0.0,
+                          0.0, 1.0, 0.0,
+                          0.0, 0.0, 1.0], type=pa.float32()), 3),
+        }))
+        cmts = tw.prepare_commit()
+        tc.commit(cmts)
+        tw.close()
+        tc.close()
+
+        # DedicatedFormatWriter produces parquet + blob + vector files
+        all_files = [nf for m in cmts for nf in m.new_files]
+        parquet_files = [f for f in all_files
+                         if f.file_name.endswith('.parquet')
+                         and not DataFileMeta.is_vector_file(f.file_name)]
+        blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+        vector_files = [f for f in all_files if 
DataFileMeta.is_vector_file(f.file_name)]
+        self.assertGreater(len(parquet_files), 0, "should produce normal 
parquet files")
+        self.assertGreater(len(blob_files), 0, "should produce blob files")
+        self.assertGreater(len(vector_files), 0, "should produce vector files")
+        for vf in vector_files:
+            self.assertIn('.vector.parquet', vf.file_name)
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 3)
+        self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+        self.assertEqual(actual.column('doc').to_pylist(), [b'aaa', b'bbb', 
b'ccc'])
+        self.assertEqual(actual.column('embed')[0].as_py(), [1.0, 0.0, 0.0])
+        self.assertEqual(actual.column('embed')[1].as_py(), [0.0, 1.0, 0.0])
+        self.assertEqual(actual.column('embed')[2].as_py(), [0.0, 0.0, 1.0])
+
+        # commit 2: append more rows
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.table({
+            'id': pa.array([4, 5], type=pa.int64()),
+            'doc': pa.array([b'ddd', b'eee'], type=pa.large_binary()),
+            'embed': pa.FixedSizeListArray.from_arrays(
+                pa.array([0.5, 0.5, 0.0,
+                          0.0, 0.5, 0.5], type=pa.float32()), 3),
+        }))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        rb = table.new_read_builder()
+        actual2 = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual2.num_rows, 5)
+        self.assertEqual(actual2.column('id').to_pylist(), [1, 2, 3, 4, 5])
+        self.assertEqual(actual2.column('doc').to_pylist(),
+                         [b'aaa', b'bbb', b'ccc', b'ddd', b'eee'])
+
+    def test_blob_vector_partial_write_vector_only(self):
+        """Blob+vector table with with_write_type(['embed']) — vector-only 
partial write.
+
+        When normal_column_names is empty, the writer must still flush vector
+        metadata without crashing on an empty normal data path.
+        """
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('doc', pa.large_binary()),
+            ('embed', pa.list_(pa.float32(), 3)),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'vector.file.format': 'parquet',
+        })
+        self.catalog.create_table('default.fmt_blob_vec_partial', schema, 
False)
+        table = self.catalog.get_table('default.fmt_blob_vec_partial')
+        wb = table.new_batch_write_builder()
+
+        # commit 1: write all columns
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.table({
+            'id': pa.array([1, 2, 3], type=pa.int64()),
+            'doc': pa.array([b'aaa', b'bbb', b'ccc'], type=pa.large_binary()),
+            'embed': pa.FixedSizeListArray.from_arrays(
+                pa.array([1.0, 0.0, 0.0,
+                          0.0, 1.0, 0.0,
+                          0.0, 0.0, 1.0], type=pa.float32()), 3),
+        }))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        # commit 2: write only vector column — no normal columns
+        tw = wb.new_write().with_write_type(['embed'])
+        tc = wb.new_commit()
+        tw.write_arrow(pa.table({
+            'embed': pa.FixedSizeListArray.from_arrays(
+                pa.array([0.5, 0.5, 0.0,
+                          0.0, 0.5, 0.5,
+                          0.5, 0.0, 0.5], type=pa.float32()), 3),
+        }))
+        cmts = tw.prepare_commit()
+
+        # should produce only vector files, no normal or blob files
+        all_files = [nf for m in cmts for nf in m.new_files]
+        self.assertGreater(len(all_files), 0, "should produce vector files")
+        for f in all_files:
+            self.assertTrue(DataFileMeta.is_vector_file(f.file_name),
+                            f"Expected vector file, got {f.file_name}")
+
+        for m in cmts:
+            for nf in m.new_files:
+                nf.first_row_id = 0
+        tc.commit(cmts)
+        tw.close()
+        tc.close()
+
+        # read back and verify the vector column was updated
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 3)
+        self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
+        self.assertEqual(actual.column('doc').to_pylist(), [b'aaa', b'bbb', 
b'ccc'])
+        embed = actual.column('embed')
+        self.assertEqual(embed[0].as_py(), [0.5, 0.5, 0.0])
+        self.assertEqual(embed[1].as_py(), [0.0, 0.5, 0.5])
+        self.assertEqual(embed[2].as_py(), [0.5, 0.0, 0.5])
+
+    # ------------------------------------------------------------------
+    # Projection and _ROW_ID across formats
+    # ------------------------------------------------------------------
+
+    def test_blob_with_row_id_projection(self):
+        """Read blob table with _ROW_ID projection."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('data', pa.large_binary()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+        })
+        self.catalog.create_table('default.fmt_blob_rowid', schema, False)
+        table = self.catalog.get_table('default.fmt_blob_rowid')
+        wb = table.new_batch_write_builder()
+
+        tw = wb.new_write()
+        tc = wb.new_commit()
+        tw.write_arrow(pa.Table.from_pydict(
+            {'id': [10, 20], 'data': [b'aa', b'bb']}, schema=pa_schema))
+        tc.commit(tw.prepare_commit())
+        tw.close()
+        tc.close()
+
+        rb = table.new_read_builder()
+        rb.with_projection(['id', 'data', '_ROW_ID'])
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, 2)
+        self.assertEqual(actual.column('_ROW_ID').to_pylist(), [0, 1])
+        self.assertEqual(actual.column('id').to_pylist(), [10, 20])
+        self.assertEqual(actual.column('data').to_pylist(), [b'aa', b'bb'])
+
+    def test_parquet_large_data_evolution(self):
+        """Larger dataset: 1000 rows, column-subset write+merge."""
+        pa_schema = pa.schema([
+            ('id', pa.int32()),
+            ('col_a', pa.string()),
+            ('col_b', pa.float64()),
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema, options={
+            'row-tracking.enabled': 'true',
+            'data-evolution.enabled': 'true',
+            'file.format': 'parquet',
+        })
+        self.catalog.create_table('default.fmt_parquet_large', schema, False)
+        table = self.catalog.get_table('default.fmt_parquet_large')
+        wb = table.new_batch_write_builder()
+
+        n = 1000
+        w0 = wb.new_write().with_write_type(['id', 'col_a'])
+        w1 = wb.new_write().with_write_type(['col_b'])
+        c = wb.new_commit()
+        w0.write_arrow(pa.Table.from_pydict(
+            {'id': list(range(n)), 'col_a': [f's{i}' for i in range(n)]},
+            schema=pa.schema([('id', pa.int32()), ('col_a', pa.string())])))
+        w1.write_arrow(pa.Table.from_pydict(
+            {'col_b': [float(i) for i in range(n)]},
+            schema=pa.schema([('col_b', pa.float64())])))
+        cmts = w0.prepare_commit() + w1.prepare_commit()
+        for m in cmts:
+            for nf in m.new_files:
+                nf.first_row_id = 0
+        c.commit(cmts)
+        w0.close()
+        w1.close()
+        c.close()
+
+        rb = table.new_read_builder()
+        actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(actual.num_rows, n)
+        self.assertEqual(actual.column('id').to_pylist(), list(range(n)))
+        self.assertEqual(actual.column('col_a').to_pylist(), [f's{i}' for i in 
range(n)])
+        self.assertEqual(actual.column('col_b').to_pylist(), [float(i) for i 
in range(n)])
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/write/file_store_write.py 
b/paimon-python/pypaimon/write/file_store_write.py
index c31a9f8a91..6a20708a14 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -23,7 +23,7 @@ import pyarrow as pa
 from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.write.commit_message import CommitMessage
 from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter
-from pypaimon.write.writer.data_blob_writer import DataBlobWriter
+from pypaimon.write.writer.dedicated_format_writer import DedicatedFormatWriter
 from pypaimon.write.writer.data_vector_writer import DataVectorWriter
 from pypaimon.write.writer.data_writer import DataWriter
 from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter
@@ -65,7 +65,7 @@ class FileStoreWrite:
 
         # Check if table has blob columns
         if self._has_blob_columns():
-            return DataBlobWriter(
+            return DedicatedFormatWriter(
                 table=self.table,
                 partition=partition,
                 bucket=bucket,
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py 
b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
similarity index 73%
rename from paimon-python/pypaimon/write/writer/data_blob_writer.py
rename to paimon-python/pypaimon/write/writer/dedicated_format_writer.py
index cb131f9a54..6e7ea57781 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py
@@ -25,53 +25,25 @@ from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.data.timestamp import Timestamp
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.schema.data_types import VectorType
 from pypaimon.table.row.generic_row import GenericRow
 from pypaimon.write.writer.data_writer import DataWriter
 
 logger = logging.getLogger(__name__)
 
 
-class DataBlobWriter(DataWriter):
-    """
-    A rolling file writer that handles both normal data and blob data. This 
writer creates separate
-    files for normal columns and blob columns, managing their lifecycle 
independently.
-
-    For example, given a table schema with normal columns (id INT, name 
STRING) and blob columns
-    (pic1 BLOB, pic2 BLOB), this writer will create separate files for normal 
columns and each
-    blob-file column.
-
-    Key features:
-    - Blob data can roll independently when normal data doesn't need rolling
-    - When normal data rolls, blob data MUST also be closed (Java behavior)
-    - Blob data uses more aggressive rolling (smaller target size) to prevent 
memory issues
-    - One normal data file may correspond to multiple blob data files
-    - Blob data is written immediately to disk to prevent memory corruption
-    - Blob file metadata is stored as separate DataFileMeta objects after 
normal file metadata
-    - When TableWrite.with_write_type narrows columns, incoming batches only 
carry that subset;
-      column lists are narrowed accordingly so splitting never selects missing 
columns.
-
-    Rolling behavior:
-    - Normal data rolls: Both normal and blob writers are closed together, 
blob metadata added after normal metadata
-    - Blob data rolls independently: Only blob writer is closed, blob metadata 
is cached until normal data rolls
-
-    Metadata organization:
-    - Normal file metadata is added first to committed_files
-    - Blob file metadata is added after normal file metadata in committed_files
-    - When blob rolls independently, metadata is cached until normal data rolls
-    - Result: [normal_meta, blob_meta1, blob_meta2, blob_meta3, ...]
-
-    Example file organization:
-    committed_files = [
-        normal_file1_meta,    # f1.parquet metadata
-        blob_file1_meta,      # b1.blob metadata
-        blob_file2_meta,      # b2.blob metadata
-        blob_file3_meta,      # b3.blob metadata
-        normal_file2_meta,    # f1-2.parquet metadata
-        blob_file4_meta,      # b4.blob metadata
-        blob_file5_meta,      # b5.blob metadata
-    ]
-
-    This matches the Java RollingBlobFileWriter behavior exactly.
+class DedicatedFormatWriter(DataWriter):
+    """A rolling file writer that writes normal, blob, and vector columns to 
dedicated files.
+
+    Splits incoming data three ways:
+    - Normal columns → standard data files (.parquet / .orc / .vortex / …)
+    - Blob columns (large_binary) → .blob files
+    - Vector columns (when vector.file.format is configured) → 
.vector.<format> files
+
+    This mirrors Java's DedicatedFormatRollingFileWriter.
+
+    Metadata order in committed_files:
+        [normal_meta, blob_meta1, …, vector_meta1, …]
     """
 
     # Constant for checking rolling condition periodically
@@ -101,6 +73,13 @@ class DataBlobWriter(DataWriter):
         full_blob_file_set = set(full_blob_file_column_names)
         all_column_names = self.table.field_names
 
+        # Detect vector columns that should be written to dedicated files.
+        full_vector_column_names = self._get_vector_columns_from_schema()
+        full_vector_set = set(full_vector_column_names)
+        # Only split vector columns when vector.file.format is configured.
+        has_dedicated_vector = bool(full_vector_column_names) and 
options.with_vector_format()
+        dedicated_set = full_blob_file_set | (full_vector_set if 
has_dedicated_vector else set())
+
         # Narrow columns when TableWrite.with_write_type(...) supplies a 
partial column list.
         # Incoming RecordBatches only contain those columns; selecting full 
normal/blob lists
         # would raise KeyError.
@@ -109,13 +88,17 @@ class DataBlobWriter(DataWriter):
             self.blob_file_column_names = [
                 col for col in full_blob_file_column_names if col in 
write_col_set
             ]
+            self.vector_write_columns = [
+                col for col in full_vector_column_names if col in write_col_set
+            ] if has_dedicated_vector else []
             self.normal_column_names = [
-                col for col in write_cols if col not in full_blob_file_set
+                col for col in write_cols if col not in dedicated_set
             ]
         else:
             self.blob_file_column_names = list(full_blob_file_column_names)
+            self.vector_write_columns = list(full_vector_column_names) if 
has_dedicated_vector else []
             self.normal_column_names = [
-                col for col in all_column_names if col not in 
full_blob_file_set
+                col for col in all_column_names if col not in dedicated_set
             ]
         normal_name_set = set(self.normal_column_names)
         self.normal_columns = [
@@ -143,6 +126,20 @@ class DataBlobWriter(DataWriter):
                 options=options
             )
 
+        # Initialize vector writer when vector.file.format is configured.
+        from pypaimon.write.writer.vector_writer import VectorWriter
+        self.vector_writer: Optional[VectorWriter] = None
+        if self.vector_write_columns:
+            self.vector_writer = VectorWriter(
+                table=self.table,
+                partition=self.partition,
+                bucket=self.bucket,
+                max_seq_number=max_seq_number,
+                vector_columns=self.vector_write_columns,
+                vector_file_format=options.vector_file_format(),
+                options=options,
+            )
+
         # Initialize ExternalStorageBlobWriter if configured
         self._external_storage_writer = None
         external_storage_fields = self.options.blob_external_storage_fields()
@@ -159,10 +156,11 @@ class DataBlobWriter(DataWriter):
             )
 
         logger.info(
-            "Initialized DataBlobWriter with blob columns: %s, blob file 
columns: %s, descriptor "
-            "stored columns: %s, external storage fields: %s",
+            "Initialized DedicatedFormatWriter with blob columns: %s, blob 
file columns: %s, "
+            "vector columns: %s, descriptor stored columns: %s, external 
storage fields: %s",
             self.blob_column_names,
             self.blob_file_column_names,
+            self.vector_write_columns,
             sorted(self.blob_descriptor_fields),
             sorted(external_storage_fields) if external_storage_fields else [],
         )
@@ -178,8 +176,14 @@ class DataBlobWriter(DataWriter):
             raise ValueError("No blob field found in table schema.")
         return blob_columns
 
+    def _get_vector_columns_from_schema(self) -> List[str]:
+        return [
+            field.name for field in self.table.table_schema.fields
+            if isinstance(field.type, VectorType)
+        ]
+
     def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
-        normal_data, _ = self._split_data(data)
+        normal_data, _, _ = self._split_data(data)
         return normal_data
 
     def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> 
pa.Table:
@@ -192,22 +196,27 @@ class DataBlobWriter(DataWriter):
             if self._external_storage_writer:
                 data = self._external_storage_writer.transform_batch(data)
 
-            # Split data into normal and blob parts
-            normal_data, blob_data_map = self._split_data(data)
+            # Split data into normal, blob, and vector parts
+            normal_data, blob_data_map, vector_data = self._split_data(data)
             self._validate_descriptor_stored_fields_input(data)
 
-            # Process and accumulate normal data
+            # Process and accumulate normal data (may be None for partial 
writes)
             processed_normal = self._process_normal_data(normal_data)
-            if self.pending_normal_data is None:
-                self.pending_normal_data = processed_normal
-            else:
-                self.pending_normal_data = 
self._merge_normal_data(self.pending_normal_data, processed_normal)
+            if processed_normal is not None:
+                if self.pending_normal_data is None:
+                    self.pending_normal_data = processed_normal
+                else:
+                    self.pending_normal_data = 
self._merge_normal_data(self.pending_normal_data, processed_normal)
 
             # Write blob-file columns to dedicated blob writers.
             for blob_column, blob_data in blob_data_map.items():
                 if blob_data is not None and blob_data.num_rows > 0:
                     self.blob_writers[blob_column].write(blob_data)
 
+            # Write vector columns to dedicated vector writer.
+            if self.vector_writer is not None and vector_data is not None and 
vector_data.num_rows > 0:
+                self.vector_writer.write(vector_data)
+
             self.record_count += data.num_rows
 
             # Check if normal data rolling is needed
@@ -231,8 +240,7 @@ class DataBlobWriter(DataWriter):
             return
 
         try:
-            if self.pending_normal_data is not None and 
self.pending_normal_data.num_rows > 0:
-                self._close_current_writers()
+            self._close_current_writers()
             if self._external_storage_writer:
                 self._external_storage_writer.close()
         except Exception as e:
@@ -246,18 +254,27 @@ class DataBlobWriter(DataWriter):
         """Abort all writers and clean up resources."""
         for blob_writer in self.blob_writers.values():
             blob_writer.abort()
+        if self.vector_writer is not None:
+            self.vector_writer.abort()
         if self._external_storage_writer:
             self._external_storage_writer.abort()
         self.pending_normal_data = None
         self.committed_files.clear()
 
-    def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, 
Dict[str, pa.RecordBatch]]:
-        """Split data into normal and blob parts based on column names."""
+    def _split_data(self, data: pa.RecordBatch) -> Tuple[
+            pa.RecordBatch, Dict[str, pa.RecordBatch], 
Optional[pa.RecordBatch]]:
+        """Split data into normal, blob, and vector parts based on column 
names."""
         normal_data = data.select(self.normal_column_names) if 
self.normal_column_names else None
         blob_data_map = {
             blob_column: data.select([blob_column]) for blob_column in 
self.blob_file_column_names
         }
-        return normal_data, blob_data_map
+        vector_data = (
+            pa.RecordBatch.from_arrays(
+                [data.column(name) for name in self.vector_write_columns],
+                names=self.vector_write_columns,
+            ) if self.vector_write_columns else None
+        )
+        return normal_data, blob_data_map, vector_data
 
     def _validate_descriptor_stored_fields_input(self, data: pa.RecordBatch):
         if not self.blob_descriptor_fields:
@@ -293,10 +310,10 @@ class DataBlobWriter(DataWriter):
                     ) from e
 
     @staticmethod
-    def _process_normal_data(data: pa.RecordBatch) -> pa.Table:
+    def _process_normal_data(data: pa.RecordBatch) -> Optional[pa.Table]:
         """Process normal data (similar to base DataWriter)."""
         if data is None or data.num_rows == 0:
-            return pa.Table.from_batches([])
+            return None
         return pa.Table.from_batches([data])
 
     @staticmethod
@@ -316,30 +333,36 @@ class DataBlobWriter(DataWriter):
         return current_size > self.target_file_size
 
     def _close_current_writers(self):
-        """Close both normal and blob writers and add blob metadata after 
normal metadata (Java behavior)."""
-        if self.pending_normal_data is None or 
self.pending_normal_data.num_rows == 0:
-            return
-
-        # Close normal writer and get metadata
-        normal_meta = self._write_normal_data_to_file(self.pending_normal_data)
+        """Close normal, blob, and vector writers; add metadata in order: 
normal, blob, vector."""
+        normal_meta = None
+        if self.pending_normal_data is not None and 
self.pending_normal_data.num_rows > 0:
+            normal_meta = 
self._write_normal_data_to_file(self.pending_normal_data)
 
         blob_metas = []
         for blob_column in self.blob_file_column_names:
             writer_metas = self.blob_writers[blob_column].prepare_commit()
-            self._validate_consistency(normal_meta, writer_metas, blob_column)
+            if normal_meta is not None:
+                self._validate_consistency(normal_meta, writer_metas, 
blob_column)
             blob_metas.extend(writer_metas)
 
-        # Add normal file metadata first
-        self.committed_files.append(normal_meta)
+        vector_metas = []
+        if self.vector_writer is not None:
+            vector_metas = self.vector_writer.prepare_commit()
+            self.vector_writer.committed_files.clear()
+            if vector_metas and normal_meta is not None:
+                self._validate_consistency(normal_meta, vector_metas, 'vector')
 
-        # Add blob file metadata after normal metadata
+        if normal_meta is not None:
+            self.committed_files.append(normal_meta)
         self.committed_files.extend(blob_metas)
+        self.committed_files.extend(vector_metas)
 
-        # Reset pending data
         self.pending_normal_data = None
 
-        logger.info(f"Closed both writers - normal: {normal_meta.file_name}, "
-                    f"added {len(blob_metas)} blob file metadata after normal 
metadata")
+        if normal_meta is not None or blob_metas or vector_metas:
+            normal_name = normal_meta.file_name if normal_meta is not None 
else '<none>'
+            logger.info(f"Closed writers - normal: {normal_name}, "
+                        f"{len(blob_metas)} blob metas, {len(vector_metas)} 
vector metas")
 
     def _write_normal_data_to_file(self, data: pa.Table) -> 
Optional[DataFileMeta]:
         if data.num_rows == 0:

Reply via email to