This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c9c55b9a8 [python] Fix Python CI failure after Daft 0.7.15 release 
(#8151)
1c9c55b9a8 is described below

commit 1c9c55b9a88a8b593113dd66a52294af78e33d72
Author: XiaoHongbo <[email protected]>
AuthorDate: Sun Jun 7 21:12:22 2026 +0800

    [python] Fix Python CI failure after Daft 0.7.15 release (#8151)
    
    Python CI installs the latest Daft version. After Daft 0.7.15 was
    released, blob tests started failing
    because Daft now expects the File range field to be `position` instead
    of `offset`.
    
    PyPaimon still generated `offset` for blob File values, so this PR
    chooses the field name by Daft version
      and keeps tests compatible with both old and new Daft versions.
---
 paimon-python/pypaimon/daft/daft_blob.py           |  9 ++-
 paimon-python/pypaimon/daft/daft_compat.py         | 16 +++-
 paimon-python/pypaimon/tests/blob_table_test.py    | 87 +++++++++++-----------
 .../pypaimon/tests/daft/daft_sink_test.py          |  7 +-
 4 files changed, 65 insertions(+), 54 deletions(-)

diff --git a/paimon-python/pypaimon/daft/daft_blob.py 
b/paimon-python/pypaimon/daft/daft_blob.py
index 156cda52bf..07691883f9 100644
--- a/paimon-python/pypaimon/daft/daft_blob.py
+++ b/paimon-python/pypaimon/daft/daft_blob.py
@@ -24,12 +24,15 @@ import struct
 
 import pyarrow as pa
 
+from pypaimon.daft.daft_compat import file_range_position_field, 
file_range_size_field
+
+
 FILE_PHYSICAL_TYPE = pa.struct(
     [
         pa.field("url", pa.large_utf8()),
         pa.field("io_config", pa.large_binary()),
-        pa.field("offset", pa.int64()),
-        pa.field("length", pa.int64()),
+        pa.field(file_range_position_field(), pa.int64()),
+        pa.field(file_range_size_field(), pa.int64()),
     ]
 )
 
@@ -82,5 +85,5 @@ def blob_column_to_file_array(column: pa.Array) -> pa.Array:
             pa.array(offsets, type=pa.int64()),
             pa.array(lengths, type=pa.int64()),
         ],
-        names=["url", "io_config", "offset", "length"],
+        names=["url", "io_config", file_range_position_field(), 
file_range_size_field()],
     )
diff --git a/paimon-python/pypaimon/daft/daft_compat.py 
b/paimon-python/pypaimon/daft/daft_compat.py
index cfc3e35c36..27a3ff542f 100644
--- a/paimon-python/pypaimon/daft/daft_compat.py
+++ b/paimon-python/pypaimon/daft/daft_compat.py
@@ -45,16 +45,26 @@ def _parse_daft_version() -> tuple[int, ...]:
 
 
 def has_file_range_reads() -> bool:
-    """True if the installed Daft supports File offset/length (>= 0.7.11)."""
+    """True if the installed Daft supports File range metadata (>= 0.7.11)."""
     return _parse_daft_version() >= (0, 7, 11)
 
 
+def file_range_position_field() -> str:
+    """Return Daft File's physical position field name."""
+    return "position" if _parse_daft_version() >= (0, 7, 15) else "offset"
+
+
+def file_range_size_field() -> str:
+    """Return Daft File's physical size field name."""
+    return "size" if _parse_daft_version() >= (0, 7, 15) else "length"
+
+
 def require_file_range_reads(feature: str = "BLOB columns") -> None:
-    """Raise if Daft is too old for File offset/length support (requires >= 
0.7.11)."""
+    """Raise if Daft is too old for File range metadata support."""
     if not has_file_range_reads():
         v = ".".join(str(x) for x in _parse_daft_version())
         raise NotImplementedError(
-            f"{feature} require daft >= 0.7.11 (File offset/length support), "
+            f"{feature} require daft >= 0.7.11 (File range metadata support), "
             f"but found {v}. "
             f"Please upgrade: pip install 'daft>=0.7.11'"
         )
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py 
b/paimon-python/pypaimon/tests/blob_table_test.py
index 3d33594c4a..7619fcb6de 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -412,7 +412,7 @@ class DedicatedFormatWriterTest(unittest.TestCase):
         blob_writer.close()
 
     def test_dedicated_format_writer_write_large_blob(self):
-        """Test DedicatedFormatWriter with very large blob data (50MB per 
item) in 10 batches."""
+        """Test DedicatedFormatWriter with large blob data across multiple 
batches."""
         from pypaimon import Schema
 
         # Create schema with blob column
@@ -436,27 +436,26 @@ class DedicatedFormatWriterTest(unittest.TestCase):
         write_builder = table.new_batch_write_builder()
         blob_writer = write_builder.new_write()
 
-        # Create 5MB blob data per item
-        target_size = 5 * 1024 * 1024  # 5MB in bytes
+        # Keep the payload small enough for CI while still exercising large 
BLOB writes.
+        target_size = 512 * 1024  # 512KB in bytes
         blob_pattern = b'LARGE_BLOB_DATA_PATTERN_' + b'X' * 1024  # ~1KB 
pattern
         pattern_size = len(blob_pattern)
         repetitions = target_size // pattern_size
         large_blob_data = blob_pattern * repetitions
 
-        # Verify the blob size is approximately 5MB
+        # Verify the blob size is approximately 512KB
         blob_size_mb = len(large_blob_data) / (1024 * 1024)
-        self.assertGreater(blob_size_mb, 4)  # Should be at least 4MB
-        self.assertLess(blob_size_mb, 6)  # Should be less than 6MB
+        self.assertGreater(blob_size_mb, 0.4)  # Should be at least 400KB
+        self.assertLess(blob_size_mb, 0.6)  # Should be less than 600KB
 
         total_rows = 0
 
-        # Write 10 batches, each with 5 rows (50 rows total)
-        # Total data volume: 50 rows * 5MB = 250MB of blob data
-        for batch_num in range(10):
+        # Write 5 batches, each with 4 rows (20 rows total).
+        for batch_num in range(5):
             batch_data = pa.Table.from_pydict({
-                'id': [batch_num * 5 + i for i in range(5)],
-                'description': [f'Large blob batch {batch_num}, row {i}' for i 
in range(5)],
-                'large_blob': [large_blob_data] * 5  # 5 rows per batch, each 
with 5MB blob
+                'id': [batch_num * 4 + i for i in range(4)],
+                'description': [f'Large blob batch {batch_num}, row {i}' for i 
in range(4)],
+                'large_blob': [large_blob_data] * 4
             }, schema=pa_schema)
 
             # Write each batch
@@ -465,7 +464,7 @@ class DedicatedFormatWriterTest(unittest.TestCase):
                 total_rows += batch.num_rows
 
             # Log progress for large data processing
-            print(f"Completed batch {batch_num + 1}/10 with {batch.num_rows} 
rows")
+            print(f"Completed batch {batch_num + 1}/5 with {batch.num_rows} 
rows")
 
         # Record count is tracked internally by DedicatedFormatWriter
 
@@ -498,12 +497,10 @@ class DedicatedFormatWriterTest(unittest.TestCase):
                 self.assertGreater(file_meta.row_count, 0)
                 total_file_size += file_meta.file_size
 
-        # Verify total data written (50 rows of normal data + 50 rows of blob 
data = 100 total)
-        self.assertEqual(total_row_count, 50)
+        self.assertEqual(total_row_count, 20)
 
-        # Verify total file size is substantial (should be at least 200MB)
         total_size_mb = total_file_size / (1024 * 1024)
-        self.assertGreater(total_size_mb, 200)
+        self.assertGreater(total_file_size, len(large_blob_data) * 
total_row_count)
 
         total_files = sum(len(commit_msg.new_files) for commit_msg in 
commit_messages)
         print(f"Total data written: {total_size_mb:.2f}MB across {total_files} 
files")
@@ -2256,7 +2253,7 @@ class DedicatedFormatWriterTest(unittest.TestCase):
             f"✅ Mixed sizes end-to-end test passed: wrote and read back blobs 
ranging from {min(sizes)} to {max(sizes)} bytes")  # noqa: E501
 
     def test_blob_write_read_large_data_end_to_end_with_rolling(self):
-        """Test end-to-end blob functionality with large blob data (50MB per 
blob) and rolling behavior (40 blobs)."""
+        """Test end-to-end blob functionality with large blob data and rolling 
behavior."""
         from pypaimon import Schema
 
         # Create schema with blob column
@@ -2271,29 +2268,28 @@ class DedicatedFormatWriterTest(unittest.TestCase):
             pa_schema,
             options={
                 'row-tracking.enabled': 'true',
-                'data-evolution.enabled': 'true'
+                'data-evolution.enabled': 'true',
+                'blob.target-file-size': '5MB'
             }
         )
         self.catalog.create_table('test_db.blob_large_rolling_e2e', schema, 
False)
         table = self.catalog.get_table('test_db.blob_large_rolling_e2e')
 
-        # Create large blob data (50MB per blob)
-        large_blob_size = 50 * 1024 * 1024  # 50MB
+        num_blobs = 10
+        large_blob_size = 2 * 1024 * 1024  # 2MB
         blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024  # ~1KB pattern
         pattern_size = len(blob_pattern)
         repetitions = large_blob_size // pattern_size
         large_blob_data = blob_pattern * repetitions
 
-        # Verify the blob size is exactly 50MB
+        # Verify the blob size is approximately 2MB.
         actual_size = len(large_blob_data)
         print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024 
* 1024):.2f} MB)")
 
-        # Write 40 batches of data (each with 1 blob of 50MB)
         write_builder = table.new_batch_write_builder()
         writer = write_builder.new_write()
 
-        # Write all 40 batches first
-        for batch_id in range(40):
+        for batch_id in range(num_blobs):
             # Create test data for this batch
             test_data = pa.Table.from_pydict({
                 'id': [batch_id + 1],
@@ -2305,11 +2301,10 @@ class DedicatedFormatWriterTest(unittest.TestCase):
             # Write data
             writer.write_arrow(test_data)
 
-            # Print progress every 10 batches
-            if (batch_id + 1) % 10 == 0:
-                print(f"✅ Written batch {batch_id + 1}/40: 
{len(large_blob_data):,} bytes")
+            if (batch_id + 1) % 5 == 0:
+                print(f"✅ Written batch {batch_id + 1}/{num_blobs}: 
{len(large_blob_data):,} bytes")
 
-        print("✅ Successfully wrote all 40 batches of 50MB blobs")
+        print(f"✅ Successfully wrote all {num_blobs} batches of large blobs")
 
         # Commit all data at once
         commit_messages = writer.prepare_commit()
@@ -2333,7 +2328,10 @@ class DedicatedFormatWriterTest(unittest.TestCase):
         commit.commit(commit_messages)
         writer.close()
 
-        print(f"✅ Successfully committed {len(commit_messages)} commit 
messages with 40 batches of 50MB blobs")
+        print(
+            f"✅ Successfully committed {len(commit_messages)} commit messages "
+            f"with {num_blobs} batches of large blobs"
+        )
 
         # Read data back
         read_builder = table.new_read_builder()
@@ -2342,16 +2340,16 @@ class DedicatedFormatWriterTest(unittest.TestCase):
         splits = table_scan.plan().splits()
         result = table_read.to_arrow(splits)
 
-        self.assertEqual(sum([s.row_count for s in splits]), 40 * 2)
+        self.assertEqual(sum([s.row_count for s in splits]), num_blobs * 2)
 
         # Verify the data
-        self.assertEqual(result.num_rows, 40, "Should have 40 rows")
+        self.assertEqual(result.num_rows, num_blobs, f"Should have {num_blobs} 
rows")
         self.assertEqual(result.num_columns, 4, "Should have 4 columns")
 
         # Verify normal columns
-        expected_ids = list(range(1, 41))
-        expected_batch_ids = list(range(40))
-        expected_metadata = [f'Large blob batch {i}' for i in range(1, 41)]
+        expected_ids = list(range(1, num_blobs + 1))
+        expected_batch_ids = list(range(num_blobs))
+        expected_metadata = [f'Large blob batch {i}' for i in range(1, 
num_blobs + 1)]
 
         self.assertEqual(result.column('id').to_pylist(), expected_ids, "ID 
column should match")
         self.assertEqual(result.column('batch_id').to_pylist(), 
expected_batch_ids,
@@ -2361,25 +2359,24 @@ class DedicatedFormatWriterTest(unittest.TestCase):
 
         # Verify blob data integrity
         blob_data = result.column('large_blob').to_pylist()
-        self.assertEqual(len(blob_data), 40, "Should have 40 blob records")
+        self.assertEqual(len(blob_data), num_blobs, f"Should have {num_blobs} 
blob records")
 
         # Verify each blob
         for i, blob in enumerate(blob_data):
             self.assertEqual(len(blob), len(large_blob_data), f"Blob {i + 1} 
should be {large_blob_size:,} bytes")
             self.assertEqual(blob, large_blob_data, f"Blob {i + 1} content 
should match exactly")
 
-            # Print progress every 10 blobs
-            if (i + 1) % 10 == 0:
-                print(f"✅ Verified blob {i + 1}/40: {len(blob):,} bytes")
+            if (i + 1) % 5 == 0:
+                print(f"✅ Verified blob {i + 1}/{num_blobs}: {len(blob):,} 
bytes")
 
         # Verify total data size
         total_blob_size = sum(len(blob) for blob in blob_data)
-        expected_total_size = 40 * len(large_blob_data)
+        expected_total_size = num_blobs * len(large_blob_data)
         self.assertEqual(total_blob_size, expected_total_size,
                          f"Total blob size should be {expected_total_size:,} 
bytes")
 
         print("✅ Large blob rolling end-to-end test passed:")
-        print("   - Wrote and read back 40 blobs of 50MB each")
+        print(f"   - Wrote and read back {num_blobs} large blobs")
         print(
             f"   - Total data size: {total_blob_size:,} bytes 
({total_blob_size / (1024 * 1024 * 1024):.2f} GB)")  # noqa: E501
         print("   - All blob content verified as correct")
@@ -3029,9 +3026,9 @@ class DedicatedFormatWriterTest(unittest.TestCase):
         Test writing and reading large blob data with file rolling and 
sharding.
 
         Test workflow:
-        - Creates a table with blob column and 10MB target file size
+        - Creates a table with blob column and small target file size
         - Writes 4 batches of 40 records each (160 total records)
-        - Each record contains a 3MB blob
+        - Each record contains a 64KB blob
         - Reads data using 3-way sharding (shard 0 - 3)
         - Verifies blob data integrity and size
         - Compares concatenated sharded results with full table scan
@@ -3050,14 +3047,14 @@ class DedicatedFormatWriterTest(unittest.TestCase):
             options={
                 'row-tracking.enabled': 'true',
                 'data-evolution.enabled': 'true',
-                'blob.target-file-size': '10MB'
+                'blob.target-file-size': '512KB'
             }
         )
         self.catalog.create_table('test_db.blob_rolling_with_shard', schema, 
False)
         table = self.catalog.get_table('test_db.blob_rolling_with_shard')
 
         # Create large blob data
-        large_blob_size = 3 * 1024 * 1024
+        large_blob_size = 64 * 1024
         blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024  # ~1KB pattern
         pattern_size = len(blob_pattern)
         repetitions = large_blob_size // pattern_size
diff --git a/paimon-python/pypaimon/tests/daft/daft_sink_test.py 
b/paimon-python/pypaimon/tests/daft/daft_sink_test.py
index ad963de444..34f434a2b1 100644
--- a/paimon-python/pypaimon/tests/daft/daft_sink_test.py
+++ b/paimon-python/pypaimon/tests/daft/daft_sink_test.py
@@ -31,7 +31,7 @@ import pytest
 pypaimon = pytest.importorskip("pypaimon")
 daft = pytest.importorskip("daft")
 
-from pypaimon.daft.daft_compat import has_file_range_reads
+from pypaimon.daft.daft_compat import file_range_position_field, 
has_file_range_reads
 from pypaimon.daft.daft_catalog import PaimonTable
 from pypaimon.daft.daft_datasink import PaimonDataSink
 from pypaimon.daft.daft_paimon import _read_table, _write_table
@@ -483,8 +483,9 @@ class TestBlobType:
             assert isinstance(ref, daft.File)
             assert isinstance(ref.path, str)
             assert ".blob" in ref.path
-            assert ref.offset is not None
-            assert ref.length is not None
+            assert getattr(ref, file_range_position_field()) is not None
+            file_size = ref.size() if callable(getattr(ref, "size", None)) 
else ref.length
+            assert file_size is not None
 
 
 # ---------------------------------------------------------------------------

Reply via email to