This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1c9c55b9a8 [python] Fix Python CI failure after Daft 0.7.15 release
(#8151)
1c9c55b9a8 is described below
commit 1c9c55b9a88a8b593113dd66a52294af78e33d72
Author: XiaoHongbo <[email protected]>
AuthorDate: Sun Jun 7 21:12:22 2026 +0800
[python] Fix Python CI failure after Daft 0.7.15 release (#8151)
Python CI installs the latest Daft version. After Daft 0.7.15 was
released, blob tests started failing
because Daft now expects the File range field to be `position` instead
of `offset`.
PyPaimon still generated `offset` for blob File values, so this PR
chooses the field name by Daft version
and keeps tests compatible with both old and new Daft versions.
---
paimon-python/pypaimon/daft/daft_blob.py | 9 ++-
paimon-python/pypaimon/daft/daft_compat.py | 16 +++-
paimon-python/pypaimon/tests/blob_table_test.py | 87 +++++++++++-----------
.../pypaimon/tests/daft/daft_sink_test.py | 7 +-
4 files changed, 65 insertions(+), 54 deletions(-)
diff --git a/paimon-python/pypaimon/daft/daft_blob.py
b/paimon-python/pypaimon/daft/daft_blob.py
index 156cda52bf..07691883f9 100644
--- a/paimon-python/pypaimon/daft/daft_blob.py
+++ b/paimon-python/pypaimon/daft/daft_blob.py
@@ -24,12 +24,15 @@ import struct
import pyarrow as pa
+from pypaimon.daft.daft_compat import file_range_position_field,
file_range_size_field
+
+
FILE_PHYSICAL_TYPE = pa.struct(
[
pa.field("url", pa.large_utf8()),
pa.field("io_config", pa.large_binary()),
- pa.field("offset", pa.int64()),
- pa.field("length", pa.int64()),
+ pa.field(file_range_position_field(), pa.int64()),
+ pa.field(file_range_size_field(), pa.int64()),
]
)
@@ -82,5 +85,5 @@ def blob_column_to_file_array(column: pa.Array) -> pa.Array:
pa.array(offsets, type=pa.int64()),
pa.array(lengths, type=pa.int64()),
],
- names=["url", "io_config", "offset", "length"],
+ names=["url", "io_config", file_range_position_field(),
file_range_size_field()],
)
diff --git a/paimon-python/pypaimon/daft/daft_compat.py
b/paimon-python/pypaimon/daft/daft_compat.py
index cfc3e35c36..27a3ff542f 100644
--- a/paimon-python/pypaimon/daft/daft_compat.py
+++ b/paimon-python/pypaimon/daft/daft_compat.py
@@ -45,16 +45,26 @@ def _parse_daft_version() -> tuple[int, ...]:
def has_file_range_reads() -> bool:
- """True if the installed Daft supports File offset/length (>= 0.7.11)."""
+ """True if the installed Daft supports File range metadata (>= 0.7.11)."""
return _parse_daft_version() >= (0, 7, 11)
+def file_range_position_field() -> str:
+ """Return Daft File's physical position field name."""
+ return "position" if _parse_daft_version() >= (0, 7, 15) else "offset"
+
+
+def file_range_size_field() -> str:
+ """Return Daft File's physical size field name."""
+ return "size" if _parse_daft_version() >= (0, 7, 15) else "length"
+
+
def require_file_range_reads(feature: str = "BLOB columns") -> None:
- """Raise if Daft is too old for File offset/length support (requires >=
0.7.11)."""
+ """Raise if Daft is too old for File range metadata support."""
if not has_file_range_reads():
v = ".".join(str(x) for x in _parse_daft_version())
raise NotImplementedError(
- f"{feature} require daft >= 0.7.11 (File offset/length support), "
+ f"{feature} require daft >= 0.7.11 (File range metadata support), "
f"but found {v}. "
f"Please upgrade: pip install 'daft>=0.7.11'"
)
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py
b/paimon-python/pypaimon/tests/blob_table_test.py
index 3d33594c4a..7619fcb6de 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -412,7 +412,7 @@ class DedicatedFormatWriterTest(unittest.TestCase):
blob_writer.close()
def test_dedicated_format_writer_write_large_blob(self):
- """Test DedicatedFormatWriter with very large blob data (50MB per
item) in 10 batches."""
+ """Test DedicatedFormatWriter with large blob data across multiple
batches."""
from pypaimon import Schema
# Create schema with blob column
@@ -436,27 +436,26 @@ class DedicatedFormatWriterTest(unittest.TestCase):
write_builder = table.new_batch_write_builder()
blob_writer = write_builder.new_write()
- # Create 5MB blob data per item
- target_size = 5 * 1024 * 1024 # 5MB in bytes
+ # Keep the payload small enough for CI while still exercising large
BLOB writes.
+ target_size = 512 * 1024 # 512KB in bytes
blob_pattern = b'LARGE_BLOB_DATA_PATTERN_' + b'X' * 1024 # ~1KB
pattern
pattern_size = len(blob_pattern)
repetitions = target_size // pattern_size
large_blob_data = blob_pattern * repetitions
- # Verify the blob size is approximately 5MB
+ # Verify the blob size is approximately 512KB
blob_size_mb = len(large_blob_data) / (1024 * 1024)
- self.assertGreater(blob_size_mb, 4) # Should be at least 4MB
- self.assertLess(blob_size_mb, 6) # Should be less than 6MB
+ self.assertGreater(blob_size_mb, 0.4) # Should be at least 400KB
+ self.assertLess(blob_size_mb, 0.6) # Should be less than 600KB
total_rows = 0
- # Write 10 batches, each with 5 rows (50 rows total)
- # Total data volume: 50 rows * 5MB = 250MB of blob data
- for batch_num in range(10):
+ # Write 5 batches, each with 4 rows (20 rows total).
+ for batch_num in range(5):
batch_data = pa.Table.from_pydict({
- 'id': [batch_num * 5 + i for i in range(5)],
- 'description': [f'Large blob batch {batch_num}, row {i}' for i
in range(5)],
- 'large_blob': [large_blob_data] * 5 # 5 rows per batch, each
with 5MB blob
+ 'id': [batch_num * 4 + i for i in range(4)],
+ 'description': [f'Large blob batch {batch_num}, row {i}' for i
in range(4)],
+ 'large_blob': [large_blob_data] * 4
}, schema=pa_schema)
# Write each batch
@@ -465,7 +464,7 @@ class DedicatedFormatWriterTest(unittest.TestCase):
total_rows += batch.num_rows
# Log progress for large data processing
- print(f"Completed batch {batch_num + 1}/10 with {batch.num_rows}
rows")
+ print(f"Completed batch {batch_num + 1}/5 with {batch.num_rows}
rows")
# Record count is tracked internally by DedicatedFormatWriter
@@ -498,12 +497,10 @@ class DedicatedFormatWriterTest(unittest.TestCase):
self.assertGreater(file_meta.row_count, 0)
total_file_size += file_meta.file_size
- # Verify total data written (50 rows of normal data + 50 rows of blob
data = 100 total)
- self.assertEqual(total_row_count, 50)
+ self.assertEqual(total_row_count, 20)
- # Verify total file size is substantial (should be at least 200MB)
total_size_mb = total_file_size / (1024 * 1024)
- self.assertGreater(total_size_mb, 200)
+ self.assertGreater(total_file_size, len(large_blob_data) *
total_row_count)
total_files = sum(len(commit_msg.new_files) for commit_msg in
commit_messages)
print(f"Total data written: {total_size_mb:.2f}MB across {total_files}
files")
@@ -2256,7 +2253,7 @@ class DedicatedFormatWriterTest(unittest.TestCase):
f"✅ Mixed sizes end-to-end test passed: wrote and read back blobs
ranging from {min(sizes)} to {max(sizes)} bytes") # noqa: E501
def test_blob_write_read_large_data_end_to_end_with_rolling(self):
- """Test end-to-end blob functionality with large blob data (50MB per
blob) and rolling behavior (40 blobs)."""
+ """Test end-to-end blob functionality with large blob data and rolling
behavior."""
from pypaimon import Schema
# Create schema with blob column
@@ -2271,29 +2268,28 @@ class DedicatedFormatWriterTest(unittest.TestCase):
pa_schema,
options={
'row-tracking.enabled': 'true',
- 'data-evolution.enabled': 'true'
+ 'data-evolution.enabled': 'true',
+ 'blob.target-file-size': '5MB'
}
)
self.catalog.create_table('test_db.blob_large_rolling_e2e', schema,
False)
table = self.catalog.get_table('test_db.blob_large_rolling_e2e')
- # Create large blob data (50MB per blob)
- large_blob_size = 50 * 1024 * 1024 # 50MB
+ num_blobs = 10
+ large_blob_size = 2 * 1024 * 1024 # 2MB
blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern
pattern_size = len(blob_pattern)
repetitions = large_blob_size // pattern_size
large_blob_data = blob_pattern * repetitions
- # Verify the blob size is exactly 50MB
+ # Verify the blob size is approximately 2MB.
actual_size = len(large_blob_data)
print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024
* 1024):.2f} MB)")
- # Write 40 batches of data (each with 1 blob of 50MB)
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
- # Write all 40 batches first
- for batch_id in range(40):
+ for batch_id in range(num_blobs):
# Create test data for this batch
test_data = pa.Table.from_pydict({
'id': [batch_id + 1],
@@ -2305,11 +2301,10 @@ class DedicatedFormatWriterTest(unittest.TestCase):
# Write data
writer.write_arrow(test_data)
- # Print progress every 10 batches
- if (batch_id + 1) % 10 == 0:
- print(f"✅ Written batch {batch_id + 1}/40:
{len(large_blob_data):,} bytes")
+ if (batch_id + 1) % 5 == 0:
+ print(f"✅ Written batch {batch_id + 1}/{num_blobs}:
{len(large_blob_data):,} bytes")
- print("✅ Successfully wrote all 40 batches of 50MB blobs")
+ print(f"✅ Successfully wrote all {num_blobs} batches of large blobs")
# Commit all data at once
commit_messages = writer.prepare_commit()
@@ -2333,7 +2328,10 @@ class DedicatedFormatWriterTest(unittest.TestCase):
commit.commit(commit_messages)
writer.close()
- print(f"✅ Successfully committed {len(commit_messages)} commit
messages with 40 batches of 50MB blobs")
+ print(
+ f"✅ Successfully committed {len(commit_messages)} commit messages "
+ f"with {num_blobs} batches of large blobs"
+ )
# Read data back
read_builder = table.new_read_builder()
@@ -2342,16 +2340,16 @@ class DedicatedFormatWriterTest(unittest.TestCase):
splits = table_scan.plan().splits()
result = table_read.to_arrow(splits)
- self.assertEqual(sum([s.row_count for s in splits]), 40 * 2)
+ self.assertEqual(sum([s.row_count for s in splits]), num_blobs * 2)
# Verify the data
- self.assertEqual(result.num_rows, 40, "Should have 40 rows")
+ self.assertEqual(result.num_rows, num_blobs, f"Should have {num_blobs}
rows")
self.assertEqual(result.num_columns, 4, "Should have 4 columns")
# Verify normal columns
- expected_ids = list(range(1, 41))
- expected_batch_ids = list(range(40))
- expected_metadata = [f'Large blob batch {i}' for i in range(1, 41)]
+ expected_ids = list(range(1, num_blobs + 1))
+ expected_batch_ids = list(range(num_blobs))
+ expected_metadata = [f'Large blob batch {i}' for i in range(1,
num_blobs + 1)]
self.assertEqual(result.column('id').to_pylist(), expected_ids, "ID
column should match")
self.assertEqual(result.column('batch_id').to_pylist(),
expected_batch_ids,
@@ -2361,25 +2359,24 @@ class DedicatedFormatWriterTest(unittest.TestCase):
# Verify blob data integrity
blob_data = result.column('large_blob').to_pylist()
- self.assertEqual(len(blob_data), 40, "Should have 40 blob records")
+ self.assertEqual(len(blob_data), num_blobs, f"Should have {num_blobs}
blob records")
# Verify each blob
for i, blob in enumerate(blob_data):
self.assertEqual(len(blob), len(large_blob_data), f"Blob {i + 1}
should be {large_blob_size:,} bytes")
self.assertEqual(blob, large_blob_data, f"Blob {i + 1} content
should match exactly")
- # Print progress every 10 blobs
- if (i + 1) % 10 == 0:
- print(f"✅ Verified blob {i + 1}/40: {len(blob):,} bytes")
+ if (i + 1) % 5 == 0:
+ print(f"✅ Verified blob {i + 1}/{num_blobs}: {len(blob):,}
bytes")
# Verify total data size
total_blob_size = sum(len(blob) for blob in blob_data)
- expected_total_size = 40 * len(large_blob_data)
+ expected_total_size = num_blobs * len(large_blob_data)
self.assertEqual(total_blob_size, expected_total_size,
f"Total blob size should be {expected_total_size:,}
bytes")
print("✅ Large blob rolling end-to-end test passed:")
- print(" - Wrote and read back 40 blobs of 50MB each")
+ print(f" - Wrote and read back {num_blobs} large blobs")
print(
f" - Total data size: {total_blob_size:,} bytes
({total_blob_size / (1024 * 1024 * 1024):.2f} GB)") # noqa: E501
print(" - All blob content verified as correct")
@@ -3029,9 +3026,9 @@ class DedicatedFormatWriterTest(unittest.TestCase):
Test writing and reading large blob data with file rolling and
sharding.
Test workflow:
- - Creates a table with blob column and 10MB target file size
+ - Creates a table with blob column and small target file size
- Writes 4 batches of 40 records each (160 total records)
- - Each record contains a 3MB blob
+ - Each record contains a 64KB blob
- Reads data using 3-way sharding (shard 0 - 3)
- Verifies blob data integrity and size
- Compares concatenated sharded results with full table scan
@@ -3050,14 +3047,14 @@ class DedicatedFormatWriterTest(unittest.TestCase):
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
- 'blob.target-file-size': '10MB'
+ 'blob.target-file-size': '512KB'
}
)
self.catalog.create_table('test_db.blob_rolling_with_shard', schema,
False)
table = self.catalog.get_table('test_db.blob_rolling_with_shard')
# Create large blob data
- large_blob_size = 3 * 1024 * 1024
+ large_blob_size = 64 * 1024
blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern
pattern_size = len(blob_pattern)
repetitions = large_blob_size // pattern_size
diff --git a/paimon-python/pypaimon/tests/daft/daft_sink_test.py
b/paimon-python/pypaimon/tests/daft/daft_sink_test.py
index ad963de444..34f434a2b1 100644
--- a/paimon-python/pypaimon/tests/daft/daft_sink_test.py
+++ b/paimon-python/pypaimon/tests/daft/daft_sink_test.py
@@ -31,7 +31,7 @@ import pytest
pypaimon = pytest.importorskip("pypaimon")
daft = pytest.importorskip("daft")
-from pypaimon.daft.daft_compat import has_file_range_reads
+from pypaimon.daft.daft_compat import file_range_position_field,
has_file_range_reads
from pypaimon.daft.daft_catalog import PaimonTable
from pypaimon.daft.daft_datasink import PaimonDataSink
from pypaimon.daft.daft_paimon import _read_table, _write_table
@@ -483,8 +483,9 @@ class TestBlobType:
assert isinstance(ref, daft.File)
assert isinstance(ref.path, str)
assert ".blob" in ref.path
- assert ref.offset is not None
- assert ref.length is not None
+ assert getattr(ref, file_range_position_field()) is not None
+ file_size = ref.size() if callable(getattr(ref, "size", None))
else ref.length
+ assert file_size is not None
# ---------------------------------------------------------------------------