This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 462c4203ac309a336e67fb1ae58232dca303a868 Author: YeJunHao <[email protected]> AuthorDate: Fri Oct 17 13:33:45 2025 +0800 [python] Support blob read && write (#6420) --- paimon-python/pypaimon/common/core_options.py | 2 + paimon-python/pypaimon/common/file_io.py | 1 - .../pypaimon/read/reader/concat_batch_reader.py | 68 ++ .../pypaimon/read/scanner/full_starting_scanner.py | 117 +- paimon-python/pypaimon/read/split_read.py | 42 +- paimon-python/pypaimon/read/table_read.py | 3 +- paimon-python/pypaimon/schema/data_types.py | 1 + paimon-python/pypaimon/schema/schema.py | 36 +- paimon-python/pypaimon/tests/blob_table_test.py | 1177 ++++++++++++++++++++ paimon-python/pypaimon/write/file_store_commit.py | 3 +- paimon-python/pypaimon/write/file_store_write.py | 22 +- .../writer/blob_writer.py} | 61 +- .../pypaimon/write/writer/data_blob_writer.py | 321 ++++++ paimon-python/pypaimon/write/writer/data_writer.py | 49 +- 14 files changed, 1844 insertions(+), 59 deletions(-) diff --git a/paimon-python/pypaimon/common/core_options.py b/paimon-python/pypaimon/common/core_options.py index 8ab26fe062..da1ad0674e 100644 --- a/paimon-python/pypaimon/common/core_options.py +++ b/paimon-python/pypaimon/common/core_options.py @@ -49,3 +49,5 @@ class CoreOptions(str, Enum): INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp" # Commit options COMMIT_USER_PREFIX = "commit.user-prefix" + ROW_TRACKING_ENABLED = "row-tracking.enabled" + DATA_EVOLUTION_ENABLED = "data-evolution.enabled" diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index eb32ebb755..f881ba77bc 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -25,7 +25,6 @@ from urllib.parse import splitport, urlparse import pyarrow from packaging.version import parse from pyarrow._fs import FileSystem - from pypaimon.common.config import OssOptions, S3Options from pypaimon.common.uri_reader import UriReaderFactory from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py b/paimon-python/pypaimon/read/reader/concat_batch_reader.py index a5a596e1ea..de4f10c15b 100644 --- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py @@ -19,6 +19,7 @@ import collections from typing import Callable, List, Optional +import pyarrow as pa from pyarrow import RecordBatch from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader @@ -76,3 +77,70 @@ class ShardBatchReader(ConcatBatchReader): return batch.slice(0, self.split_end_row - cur_begin) else: return batch + + +class MergeAllBatchReader(RecordBatchReader): + """ + A reader that accepts multiple reader suppliers and concatenates all their arrow batches + into one big batch. This is useful when you want to merge all data from multiple sources + into a single batch for processing. + """ + + def __init__(self, reader_suppliers: List[Callable]): + self.reader_suppliers = reader_suppliers + self.merged_batch: Optional[RecordBatch] = None + self.batch_created = False + + def read_arrow_batch(self) -> Optional[RecordBatch]: + if self.batch_created: + return None + + all_batches = [] + + # Read all batches from all reader suppliers + for supplier in self.reader_suppliers: + reader = supplier() + try: + while True: + batch = reader.read_arrow_batch() + if batch is None: + break + all_batches.append(batch) + finally: + reader.close() + + # Concatenate all batches into one big batch + if all_batches: + # For PyArrow < 17.0.0, use Table.concat_tables approach + # Convert batches to tables and concatenate + tables = [pa.Table.from_batches([batch]) for batch in all_batches] + if len(tables) == 1: + # Single table, just get the first batch + self.merged_batch = tables[0].to_batches()[0] + else: + # Multiple tables, concatenate them + concatenated_table = pa.concat_tables(tables) + # Convert back to a single batch by taking all batches and combining + all_concatenated_batches = concatenated_table.to_batches() + if len(all_concatenated_batches) == 1: + self.merged_batch = all_concatenated_batches[0] + else: + # If still multiple batches, we need to manually combine them + # This shouldn't happen with concat_tables, but just in case + combined_arrays = [] + for i in range(len(all_concatenated_batches[0].columns)): + column_arrays = [batch.column(i) for batch in all_concatenated_batches] + combined_arrays.append(pa.concat_arrays(column_arrays)) + self.merged_batch = pa.RecordBatch.from_arrays( + combined_arrays, + names=all_concatenated_batches[0].schema.names + ) + else: + self.merged_batch = None + + self.batch_created = True + return self.merged_batch + + def close(self) -> None: + self.merged_batch = None + self.batch_created = False diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py index 4275b3f3e2..73915a0412 100644 --- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py +++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py @@ -18,6 +18,7 @@ limitations under the License. from collections import defaultdict from typing import Callable, List, Optional +from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate from pypaimon.common.predicate_builder import PredicateBuilder from pypaimon.manifest.manifest_file_manager import ManifestFileManager @@ -65,6 +66,7 @@ class FullStartingScanner(StartingScanner): self.only_read_real_buckets = True if int( self.table.options.get('bucket', -1)) == BucketMode.POSTPONE_BUCKET.value else False + self.data_evolution = self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() == 'true' def scan(self) -> Plan: file_entries = self.plan_files() @@ -72,6 +74,8 @@ class FullStartingScanner(StartingScanner): return Plan([]) if self.table.is_primary_key_table: splits = self._create_primary_key_splits(file_entries) + elif self.data_evolution: + splits = self._create_data_evolution_splits(file_entries) else: splits = self._create_append_only_splits(file_entries) @@ -104,7 +108,7 @@ class FullStartingScanner(StartingScanner): file_entries = self._filter_by_predicate(file_entries) return file_entries - def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan': + def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'FullStartingScanner': if idx_of_this_subtask >= number_of_para_subtasks: raise Exception("idx_of_this_subtask must be less than number_of_para_subtasks") self.idx_of_this_subtask = idx_of_this_subtask @@ -357,3 +361,114 @@ class FullStartingScanner(StartingScanner): packed.append(bin_items) return packed + + def _create_data_evolution_splits(self, file_entries: List[ManifestEntry]) -> List['Split']: + partitioned_files = defaultdict(list) + for entry in file_entries: + partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) + + if self.idx_of_this_subtask is not None: + partitioned_files, plan_start_row, plan_end_row = self._append_only_filter_by_shard(partitioned_files) + + def weight_func(file_list: List[DataFileMeta]) -> int: + return max(sum(f.file_size for f in file_list), self.open_file_cost) + + splits = [] + for key, file_entries in partitioned_files.items(): + if not file_entries: + continue + + data_files: List[DataFileMeta] = [e.file for e in file_entries] + + # Split files by firstRowId for data evolution + split_by_row_id = self._split_by_row_id(data_files) + + # Pack the split groups for optimal split sizes + packed_files: List[List[List[DataFileMeta]]] = self._pack_for_ordered(split_by_row_id, weight_func, + self.target_split_size) + + # Flatten the packed files and build splits + flatten_packed_files: List[List[DataFileMeta]] = [ + [file for sub_pack in pack for file in sub_pack] + for pack in packed_files + ] + + splits += self._build_split_from_pack(flatten_packed_files, file_entries, False) + + if self.idx_of_this_subtask is not None: + self._compute_split_start_end_row(splits, plan_start_row, plan_end_row) + return splits + + def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: + split_by_row_id = [] + + def sort_key(file: DataFileMeta) -> tuple: + first_row_id = file.first_row_id if file.first_row_id is not None else float('-inf') + is_blob = 1 if self._is_blob_file(file.file_name) else 0 + # For files with same firstRowId, sort by maxSequenceNumber in descending order + # (larger sequence number means more recent data) + max_seq = file.max_sequence_number + return (first_row_id, is_blob, -max_seq) + + sorted_files = sorted(files, key=sort_key) + + # Filter blob files to only include those within the row ID range of non-blob files + sorted_files = self._filter_blob(sorted_files) + + # Split files by firstRowId + last_row_id = -1 + check_row_id_start = 0 + current_split = [] + + for file in sorted_files: + first_row_id = file.first_row_id + if first_row_id is None: + # Files without firstRowId are treated as individual splits + split_by_row_id.append([file]) + continue + + if not self._is_blob_file(file.file_name) and first_row_id != last_row_id: + if current_split: + split_by_row_id.append(current_split) + + # Validate that files don't overlap + if first_row_id < check_row_id_start: + file_names = [f.file_name for f in sorted_files] + raise ValueError( + f"There are overlapping files in the split: {file_names}, " + f"the wrong file is: {file.file_name}" + ) + + current_split = [] + last_row_id = first_row_id + check_row_id_start = first_row_id + file.row_count + + current_split.append(file) + + if current_split: + split_by_row_id.append(current_split) + + return split_by_row_id + + @staticmethod + def _is_blob_file(file_name: str) -> bool: + return file_name.endswith('.blob') + + @staticmethod + def _filter_blob(files: List[DataFileMeta]) -> List[DataFileMeta]: + result = [] + row_id_start = -1 + row_id_end = -1 + + for file in files: + if not FullStartingScanner._is_blob_file(file.file_name): + if file.first_row_id is not None: + row_id_start = file.first_row_id + row_id_end = file.first_row_id + file.row_count + result.append(file) + else: + if file.first_row_id is not None and row_id_start != -1: + if row_id_start <= file.first_row_id < row_id_end: + result.append(file) + + return result diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 372679ed63..64a28ac63c 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -26,7 +26,7 @@ from pypaimon.common.predicate import Predicate from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.read.interval_partition import IntervalPartition, SortedRun from pypaimon.read.partition_info import PartitionInfo -from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader, ShardBatchReader +from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader, ShardBatchReader, MergeAllBatchReader from pypaimon.read.reader.concat_record_reader import ConcatRecordReader from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader from pypaimon.read.reader.data_evolution_merge_reader import DataEvolutionMergeReader @@ -73,21 +73,21 @@ class SplitRead(ABC): def create_reader(self) -> RecordReader: """Create a record reader for the given split.""" - def file_reader_supplier(self, file_path: str, for_merge_read: bool): + def file_reader_supplier(self, file_path: str, for_merge_read: bool, read_fields: List[str]): _, extension = os.path.splitext(file_path) file_format = extension[1:] format_reader: RecordBatchReader if file_format == CoreOptions.FILE_FORMAT_AVRO: - format_reader = FormatAvroReader(self.table.file_io, file_path, self._get_final_read_data_fields(), + format_reader = FormatAvroReader(self.table.file_io, file_path, read_fields, self.read_fields, self.push_down_predicate) elif file_format == CoreOptions.FILE_FORMAT_BLOB: blob_as_descriptor = self.table.options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False) - format_reader = FormatBlobReader(self.table.file_io, file_path, self._get_final_read_data_fields(), + format_reader = FormatBlobReader(self.table.file_io, file_path, read_fields, self.read_fields, self.push_down_predicate, blob_as_descriptor) elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC: format_reader = FormatPyArrowReader(self.table.file_io, file_format, file_path, - self._get_final_read_data_fields(), self.push_down_predicate) + read_fields, self.push_down_predicate) else: raise ValueError(f"Unexpected file format: {file_format}") @@ -253,7 +253,12 @@ class RawFileSplitRead(SplitRead): def create_reader(self) -> RecordReader: data_readers = [] for file_path in self.split.file_paths: - supplier = partial(self.file_reader_supplier, file_path=file_path, for_merge_read=False) + supplier = partial( + self.file_reader_supplier, + file_path=file_path, + for_merge_read=False, + read_fields=self._get_final_read_data_fields(), + ) data_readers.append(supplier) if not data_readers: @@ -274,7 +279,12 @@ class RawFileSplitRead(SplitRead): class MergeFileSplitRead(SplitRead): def kv_reader_supplier(self, file_path): - reader_supplier = partial(self.file_reader_supplier, file_path=file_path, for_merge_read=True) + reader_supplier = partial( + self.file_reader_supplier, + file_path=file_path, + for_merge_read=True, + read_fields=self._get_final_read_data_fields() + ) return KeyValueWrapReader(reader_supplier(), len(self.trimmed_primary_key), self.value_arity) def section_reader_supplier(self, section: List[SortedRun]): @@ -317,7 +327,7 @@ class DataEvolutionSplitRead(SplitRead): if len(need_merge_files) == 1 or not self.read_fields: # No need to merge fields, just create a single file reader suppliers.append( - lambda f=need_merge_files[0]: self._create_file_reader(f) + lambda f=need_merge_files[0]: self._create_file_reader(f, self._get_final_read_data_fields()) ) else: suppliers.append( @@ -424,26 +434,30 @@ class DataEvolutionSplitRead(SplitRead): self.read_fields = read_fields # create reader based on read_fields # Create reader for this bunch if len(bunch.files()) == 1: - file_record_readers[i] = self._create_file_reader(bunch.files()[0]) + file_record_readers[i] = self._create_file_reader( + bunch.files()[0], [field.name for field in read_fields] + ) else: # Create concatenated reader for multiple files suppliers = [ - lambda f=file: self._create_file_reader(f) for file in bunch.files() + lambda f=file: self._create_file_reader( + f, [field.name for field in read_fields] + ) for file in bunch.files() ] - file_record_readers[i] = ConcatRecordReader(suppliers) + file_record_readers[i] = MergeAllBatchReader(suppliers) self.read_fields = table_fields # Validate that all required fields are found for i, field in enumerate(all_read_fields): if row_offsets[i] == -1: - if not field.type.is_nullable(): + if not field.type.nullable: raise ValueError(f"Field {field} is not null but can't find any file contains it.") return DataEvolutionMergeReader(row_offsets, field_offsets, file_record_readers) - def _create_file_reader(self, file: DataFileMeta) -> RecordReader: + def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> RecordReader: """Create a file reader for a single file.""" - return self.file_reader_supplier(file_path=file.file_path, for_merge_read=False) + return self.file_reader_supplier(file_path=file.file_path, for_merge_read=False, read_fields=read_fields) def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> List[FieldBunch]: """Split files into field bunches.""" diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index cf32d04d72..4c2c615d89 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -20,6 +20,7 @@ from typing import Any, Iterator, List, Optional import pandas import pyarrow +from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate from pypaimon.common.predicate_builder import PredicateBuilder from pypaimon.read.push_down_utils import extract_predicate_to_list @@ -132,7 +133,7 @@ class TableRead: read_type=self.read_type, split=split ) - elif self.table.options.get('data-evolution.enabled', 'false').lower() == 'true': + elif self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() == 'true': return DataEvolutionSplitRead( table=self.table, predicate=self.predicate, diff --git a/paimon-python/pypaimon/schema/data_types.py b/paimon-python/pypaimon/schema/data_types.py index d1ce2354a8..c65771d7f2 100644 --- a/paimon-python/pypaimon/schema/data_types.py +++ b/paimon-python/pypaimon/schema/data_types.py @@ -548,6 +548,7 @@ class PyarrowFieldParser: @staticmethod def to_paimon_schema(pa_schema: pyarrow.Schema) -> List[DataField]: + # Convert PyArrow schema to Paimon fields fields = [] for i, pa_field in enumerate(pa_schema): pa_field: pyarrow.Field diff --git a/paimon-python/pypaimon/schema/schema.py b/paimon-python/pypaimon/schema/schema.py index 965fe2255b..0ad53f99d3 100644 --- a/paimon-python/pypaimon/schema/schema.py +++ b/paimon-python/pypaimon/schema/schema.py @@ -20,6 +20,7 @@ from typing import Dict, List, Optional import pyarrow as pa +from pypaimon.common.core_options import CoreOptions from pypaimon.common.json_util import json_field from pypaimon.schema.data_types import DataField, PyarrowFieldParser @@ -51,4 +52,37 @@ class Schema: def from_pyarrow_schema(pa_schema: pa.Schema, partition_keys: Optional[List[str]] = None, primary_keys: Optional[List[str]] = None, options: Optional[Dict] = None, comment: Optional[str] = None): - return Schema(PyarrowFieldParser.to_paimon_schema(pa_schema), partition_keys, primary_keys, options, comment) + # Convert PyArrow schema to Paimon fields + fields = PyarrowFieldParser.to_paimon_schema(pa_schema) + + # Check if Blob type exists in the schema + has_blob_type = any( + 'blob' in str(field.type).lower() + for field in fields + ) + + # If Blob type exists, validate required options + if has_blob_type: + if options is None: + options = {} + + required_options = { + CoreOptions.ROW_TRACKING_ENABLED: 'true', + CoreOptions.DATA_EVOLUTION_ENABLED: 'true' + } + + missing_options = [] + for key, expected_value in required_options.items(): + if key not in options or options[key] != expected_value: + missing_options.append(f"{key}='{expected_value}'") + + if missing_options: + raise ValueError( + f"Schema contains Blob type but is missing required options: {', '.join(missing_options)}. " + f"Please add these options to the schema." + ) + + if primary_keys is not None: + raise ValueError("Blob type is not supported with primary key.") + + return Schema(fields, partition_keys, primary_keys, options, comment) diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py new file mode 100644 index 0000000000..3c8273bde4 --- /dev/null +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -0,0 +1,1177 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import os +import shutil +import tempfile +import unittest + +import pyarrow as pa + +from pypaimon import CatalogFactory +from pypaimon.write.commit_message import CommitMessage + + +class DataBlobWriterTest(unittest.TestCase): + """Tests for DataBlobWriter functionality with paimon table operations.""" + + @classmethod + def setUpClass(cls): + """Set up test environment.""" + cls.temp_dir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.temp_dir, 'warehouse') + # Create catalog for table operations + cls.catalog = CatalogFactory.create({ + 'warehouse': cls.warehouse + }) + cls.catalog.create_database('test_db', False) + + @classmethod + def tearDownClass(cls): + """Clean up test environment.""" + try: + shutil.rmtree(cls.temp_dir) + except OSError: + pass + + def test_data_blob_writer_basic_functionality(self): + """Test basic DataBlobWriter functionality with paimon table.""" + from pypaimon import Schema + + # Create schema with normal and blob columns + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('blob_data', pa.large_binary()), # This will be detected as blob + ]) + + # Create Paimon schema + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + + # Create table + self.catalog.create_table('test_db.blob_writer_test', schema, False) + table = self.catalog.get_table('test_db.blob_writer_test') + + # Test data + test_data = pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'name': ['Alice', 'Bob', 'Charlie'], + 'blob_data': [b'blob_data_1', b'blob_data_2', b'blob_data_3'] + }, schema=pa_schema) + + # Test DataBlobWriter initialization using proper table API + # Use proper table API to create writer + write_builder = table.new_batch_write_builder() + blob_writer = write_builder.new_write() + + # Write test data using BatchTableWrite API + blob_writer.write_arrow(test_data) + + # Test prepare commit + commit_messages = blob_writer.prepare_commit() + self.assertIsInstance(commit_messages, list) + self.assertGreater(len(commit_messages), 0) + + # Verify commit message structure + for commit_msg in commit_messages: + self.assertIsInstance(commit_msg.new_files, list) + self.assertGreater(len(commit_msg.new_files), 0) + + # Verify file metadata structure + for file_meta in commit_msg.new_files: + self.assertIsNotNone(file_meta.file_name) + self.assertGreater(file_meta.file_size, 0) + self.assertGreater(file_meta.row_count, 0) + + blob_writer.close() + + def test_data_blob_writer_schema_detection(self): + """Test that DataBlobWriter correctly detects blob columns from schema.""" + from pypaimon import Schema + + # Test schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('blob_field', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.blob_detection_test', schema, False) + table = self.catalog.get_table('test_db.blob_detection_test') + + # Use proper table API to create writer + write_builder = table.new_batch_write_builder() + blob_writer = write_builder.new_write() + + # Test that DataBlobWriter was created internally + # We can verify this by checking the internal data writers + test_data = pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'blob_field': [b'blob1', b'blob2', b'blob3'] + }, schema=pa_schema) + + # Write data to trigger writer creation + blob_writer.write_arrow(test_data) + + # Verify that a DataBlobWriter was created internally + data_writers = blob_writer.file_store_write.data_writers + self.assertGreater(len(data_writers), 0) + + # Check that the writer is a DataBlobWriter + for writer in data_writers.values(): + from pypaimon.write.writer.data_blob_writer import DataBlobWriter + self.assertIsInstance(writer, DataBlobWriter) + + blob_writer.close() + + def test_data_blob_writer_no_blob_column(self): + """Test that DataBlobWriter raises error when no blob column is found.""" + from pypaimon import Schema + + # Test schema without blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.no_blob_test', schema, False) + table = self.catalog.get_table('test_db.no_blob_test') + + # Use proper table API to create writer + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + + # Test that a regular writer (not DataBlobWriter) was created + test_data = pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'name': ['Alice', 'Bob', 'Charlie'] + }, schema=pa_schema) + + # Write data to trigger writer creation + writer.write_arrow(test_data) + + # Verify that a regular writer was created (not DataBlobWriter) + data_writers = writer.file_store_write.data_writers + self.assertGreater(len(data_writers), 0) + + # Check that the writer is NOT a DataBlobWriter + for writer_instance in data_writers.values(): + from pypaimon.write.writer.data_blob_writer import DataBlobWriter + self.assertNotIsInstance(writer_instance, DataBlobWriter) + + writer.close() + + def test_data_blob_writer_multiple_blob_columns(self): + """Test that DataBlobWriter raises error when multiple blob columns are found.""" + from pypaimon import Schema + + # Test schema with multiple blob columns + pa_schema = pa.schema([ + ('id', pa.int32()), + ('blob1', pa.large_binary()), + ('blob2', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.multiple_blob_test', schema, False) + table = self.catalog.get_table('test_db.multiple_blob_test') + + # Use proper table API to create writer + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + + # Test data with multiple blob columns + test_data = pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'blob1': [b'blob1_1', b'blob1_2', b'blob1_3'], + 'blob2': [b'blob2_1', b'blob2_2', b'blob2_3'] + }, schema=pa_schema) + + # This should raise an error when DataBlobWriter is created internally + with self.assertRaises(ValueError) as context: + writer.write_arrow(test_data) + self.assertIn("Limit exactly one blob field in one paimon table yet", str(context.exception)) + + def test_data_blob_writer_write_operations(self): + """Test DataBlobWriter write operations with real data.""" + from pypaimon import Schema + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('document', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.write_test', schema, False) + table = self.catalog.get_table('test_db.write_test') + + # Use proper table API to create writer + write_builder = table.new_batch_write_builder() + blob_writer = write_builder.new_write() + + # Test data + test_data = pa.Table.from_pydict({ + 'id': [1, 2], + 'name': ['Alice', 'Bob'], + 'document': [b'document_content_1', b'document_content_2'] + }, schema=pa_schema) + + # Test writing data + for batch in test_data.to_batches(): + blob_writer.write_arrow_batch(batch) + + # Test prepare commit + commit_messages = blob_writer.prepare_commit() + self.assertIsInstance(commit_messages, list) + + blob_writer.close() + + def test_data_blob_writer_write_large_blob(self): + """Test DataBlobWriter with very large blob data (50MB per item) in 10 batches.""" + from pypaimon import Schema + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('description', pa.string()), + ('large_blob', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.large_blob_test', schema, False) + table = self.catalog.get_table('test_db.large_blob_test') + + # Use proper table API to create writer + write_builder = table.new_batch_write_builder() + blob_writer = write_builder.new_write() + + # Create 50MB blob data per item + # Using a pattern to make the data more realistic and compressible + target_size = 50 * 1024 * 1024 # 50MB in bytes + blob_pattern = b'LARGE_BLOB_DATA_PATTERN_' + b'X' * 1024 # ~1KB pattern + pattern_size = len(blob_pattern) + repetitions = target_size // pattern_size + large_blob_data = blob_pattern * repetitions + + # Verify the blob size is approximately 50MB + blob_size_mb = len(large_blob_data) / (1024 * 1024) + self.assertGreater(blob_size_mb, 49) # Should be at least 49MB + self.assertLess(blob_size_mb, 51) # Should be less than 51MB + + total_rows = 0 + + # Write 10 batches, each with 5 rows (50 rows total) + # Total data volume: 50 rows * 50MB = 2.5GB of blob data + for batch_num in range(10): + batch_data = pa.Table.from_pydict({ + 'id': [batch_num * 5 + i for i in range(5)], + 'description': [f'Large blob batch {batch_num}, row {i}' for i in range(5)], + 'large_blob': [large_blob_data] * 5 # 5 rows per batch, each with 50MB blob + }, schema=pa_schema) + + # Write each batch + for batch in batch_data.to_batches(): + blob_writer.write_arrow_batch(batch) + total_rows += batch.num_rows + + # Log progress for large data processing + print(f"Completed batch {batch_num + 1}/10 with {batch.num_rows} rows") + + # Record count is tracked internally by DataBlobWriter + + # Test prepare commit + commit_messages: CommitMessage = blob_writer.prepare_commit() + self.assertIsInstance(commit_messages, list) + # Verify we have commit messages + self.assertEqual(len(commit_messages), 1) + commit_message = commit_messages[0] + normal_file_meta = commit_message.new_files[0] + blob_file_metas = commit_message.new_files[1:] + # Validate row count consistency + parquet_row_count = normal_file_meta.row_count + blob_row_count_sum = sum(meta.row_count for meta in blob_file_metas) + self.assertEqual(parquet_row_count, blob_row_count_sum, + f"Parquet row count ({parquet_row_count}) should equal " + f"sum of blob row counts ({blob_row_count_sum})") + + # Verify commit message structure and file metadata + total_file_size = 0 + total_row_count = parquet_row_count + for commit_msg in commit_messages: + self.assertIsInstance(commit_msg.new_files, list) + self.assertGreater(len(commit_msg.new_files), 0) + + # Verify file metadata structure + for file_meta in commit_msg.new_files: + self.assertIsNotNone(file_meta.file_name) + self.assertGreater(file_meta.file_size, 0) + self.assertGreater(file_meta.row_count, 0) + total_file_size += file_meta.file_size + + # Verify total data written (50 rows of normal data + 50 rows of blob data = 100 total) + self.assertEqual(total_row_count, 50) + + # Verify total file size is substantial (should be much larger than 2.5GB due to overhead) + total_size_mb = total_file_size / (1024 * 1024) + self.assertGreater(total_size_mb, 2000) # Should be at least 2GB due to overhead + + total_files = sum(len(commit_msg.new_files) for commit_msg in commit_messages) + print(f"Total data written: {total_size_mb:.2f}MB across {total_files} files") + print(f"Total rows processed: {total_row_count}") + + blob_writer.close() + + def test_data_blob_writer_abort_functionality(self): + """Test DataBlobWriter abort functionality.""" + from pypaimon import Schema + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('blob_data', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.abort_test', schema, False) + table = self.catalog.get_table('test_db.abort_test') + + # Use proper table API to create writer + write_builder = table.new_batch_write_builder() + blob_writer = write_builder.new_write() + + # Test data + test_data = pa.Table.from_pydict({ + 'id': [1, 2], + 'blob_data': [b'blob_1', b'blob_2'] + }, schema=pa_schema) + + # Write some data + for batch in test_data.to_batches(): + blob_writer.write_arrow_batch(batch) + + # Test abort - BatchTableWrite doesn't have abort method + # The abort functionality is handled internally by DataBlobWriter + + blob_writer.close() + + def test_data_blob_writer_multiple_batches(self): + """Test DataBlobWriter with multiple batches and verify results.""" + from pypaimon import Schema + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('document', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.multiple_batches_test', schema, False) + table = self.catalog.get_table('test_db.multiple_batches_test') + + # Use proper table API to create writer + write_builder = table.new_batch_write_builder() + blob_writer = write_builder.new_write() + + # Test data - multiple batches + batch1_data = pa.Table.from_pydict({ + 'id': [1, 2], + 'name': ['Alice', 'Bob'], + 'document': [b'document_1_content', b'document_2_content'] + }, schema=pa_schema) + + batch2_data = pa.Table.from_pydict({ + 'id': [3, 4, 5], + 'name': ['Charlie', 'David', 'Eve'], + 'document': [b'document_3_content', b'document_4_content', b'document_5_content'] + }, schema=pa_schema) + + batch3_data = pa.Table.from_pydict({ + 'id': [6], + 'name': ['Frank'], + 'document': [b'document_6_content'] + }, schema=pa_schema) + + # Write multiple batches + total_rows = 0 + for batch in batch1_data.to_batches(): + blob_writer.write_arrow_batch(batch) + total_rows += batch.num_rows + + for batch in batch2_data.to_batches(): + blob_writer.write_arrow_batch(batch) + total_rows += batch.num_rows + + for batch in batch3_data.to_batches(): + blob_writer.write_arrow_batch(batch) + total_rows += batch.num_rows + + # Record count is tracked internally by DataBlobWriter + + # Test prepare commit + commit_messages = blob_writer.prepare_commit() + self.assertIsInstance(commit_messages, list) + + # Verify we have committed files + self.assertGreater(len(commit_messages), 0) + + blob_writer.close() + + def test_data_blob_writer_large_batches(self): + """Test DataBlobWriter with large batches to test rolling behavior.""" + from pypaimon import Schema + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('description', pa.string()), + ('large_blob', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.large_batches_test', schema, False) + table = self.catalog.get_table('test_db.large_batches_test') + + # Use proper table API to create writer + write_builder = table.new_batch_write_builder() + blob_writer = write_builder.new_write() + + # Create large batches with substantial blob data + large_blob_data = b'L' * 10000 # 10KB blob data + + # Batch 1: 100 rows + batch1_data = pa.Table.from_pydict({ + 'id': list(range(1, 101)), + 'description': [f'Description for row {i}' for i in range(1, 101)], + 'large_blob': [large_blob_data] * 100 + }, schema=pa_schema) + + # Batch 2: 50 rows + batch2_data = pa.Table.from_pydict({ + 'id': list(range(101, 151)), + 'description': [f'Description for row {i}' for i in range(101, 151)], + 'large_blob': [large_blob_data] * 50 + }, schema=pa_schema) + + # Write large batches + total_rows = 0 + for batch in batch1_data.to_batches(): + blob_writer.write_arrow_batch(batch) + total_rows += batch.num_rows + + for batch in batch2_data.to_batches(): + blob_writer.write_arrow_batch(batch) + total_rows += batch.num_rows + + # Record count is tracked internally by DataBlobWriter + + # Test prepare commit + commit_messages = blob_writer.prepare_commit() + self.assertIsInstance(commit_messages, list) + + # Verify we have committed files + self.assertGreater(len(commit_messages), 0) + + blob_writer.close() + + def test_data_blob_writer_mixed_data_types(self): + """Test DataBlobWriter with mixed data types in blob column.""" + from pypaimon import Schema + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('type', pa.string()), + ('data', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.mixed_data_test', schema, False) + table = self.catalog.get_table('test_db.mixed_data_test') + + # Use proper table API to create writer + write_builder = table.new_batch_write_builder() + blob_writer = write_builder.new_write() + + # Test data with different types of blob content + test_data = pa.Table.from_pydict({ + 'id': [1, 2, 3, 4, 5], + 'type': ['text', 'json', 'binary', 'image', 'pdf'], + 'data': [ + b'This is text content', + b'{"key": "value", "number": 42}', + b'\x00\x01\x02\x03\xff\xfe\xfd', + b'PNG_IMAGE_DATA_PLACEHOLDER', + b'%PDF-1.4\nPDF_CONTENT_PLACEHOLDER' + ] + }, schema=pa_schema) + + # Write mixed data + total_rows = 0 + for batch in test_data.to_batches(): + blob_writer.write_arrow_batch(batch) + total_rows += batch.num_rows + + # Record count is tracked internally by DataBlobWriter + + # Test prepare commit + commit_messages = blob_writer.prepare_commit() + self.assertIsInstance(commit_messages, list) + + # Verify we have committed files + self.assertGreater(len(commit_messages), 0) + + # Verify commit message structure + for commit_msg in commit_messages: + self.assertIsInstance(commit_msg.new_files, list) + self.assertGreater(len(commit_msg.new_files), 0) + + # Verify file metadata structure + for file_meta in commit_msg.new_files: + self.assertIsNotNone(file_meta.file_name) + self.assertGreater(file_meta.file_size, 0) + self.assertGreater(file_meta.row_count, 0) + + # Should have both normal and blob files + file_names = [f.file_name for f in commit_msg.new_files] + parquet_files = [f for f in file_names if f.endswith('.parquet')] + blob_files = [f for f in file_names if f.endswith('.blob')] + + self.assertGreater(len(parquet_files), 0, "Should have at least one parquet file") + self.assertGreater(len(blob_files), 0, "Should have at least one blob file") + + # Create commit and commit the data + commit = write_builder.new_commit() + commit.commit(commit_messages) + blob_writer.close() + + # Read data back using table API + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + result = table_read.to_arrow(table_scan.plan().splits()) + + # Verify the data was read back correctly + self.assertEqual(result.num_rows, 5, "Should have 5 rows") + self.assertEqual(result.num_columns, 3, "Should have 3 columns") + + # Convert result to pandas for easier comparison + result_df = result.to_pandas() + + # Verify each row matches the original data + for i in range(5): + original_id = test_data.column('id')[i].as_py() + original_type = test_data.column('type')[i].as_py() + original_data = test_data.column('data')[i].as_py() + + result_id = result_df.iloc[i]['id'] + result_type = result_df.iloc[i]['type'] + result_data = result_df.iloc[i]['data'] + + self.assertEqual(result_id, original_id, f"Row {i+1}: ID should match") + self.assertEqual(result_type, original_type, f"Row {i+1}: Type should match") + self.assertEqual(result_data, original_data, f"Row {i+1}: Blob data should match") + + def test_data_blob_writer_empty_batches(self): + """Test DataBlobWriter with empty batches.""" + from pypaimon import Schema + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('data', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.empty_batches_test', schema, False) + table = self.catalog.get_table('test_db.empty_batches_test') + + # Use proper table API to create writer + write_builder = table.new_batch_write_builder() + blob_writer = write_builder.new_write() + + # Test data with some empty batches + batch1_data = pa.Table.from_pydict({ + 'id': [1, 2], + 'data': [b'data1', b'data2'] + }, schema=pa_schema) + + # Empty batch + empty_batch = pa.Table.from_pydict({ + 'id': [], + 'data': [] + }, schema=pa_schema) + + batch2_data = pa.Table.from_pydict({ + 'id': [3], + 'data': [b'data3'] + }, schema=pa_schema) + + # Write batches including empty ones + total_rows = 0 + for batch in batch1_data.to_batches(): + blob_writer.write_arrow_batch(batch) + total_rows += batch.num_rows + + for batch in empty_batch.to_batches(): + blob_writer.write_arrow_batch(batch) + total_rows += batch.num_rows + + for batch in batch2_data.to_batches(): + blob_writer.write_arrow_batch(batch) + total_rows += batch.num_rows + + # Verify record count (empty batch should not affect count) + # Record count is tracked internally by DataBlobWriter + # Record count is tracked internally by DataBlobWriter + + # Test prepare commit + commit_messages = blob_writer.prepare_commit() + self.assertIsInstance(commit_messages, list) + + blob_writer.close() + + def test_data_blob_writer_rolling_behavior(self): + """Test DataBlobWriter rolling behavior with multiple commits.""" + from pypaimon import Schema + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('content', pa.string()), + ('blob_data', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.rolling_test', schema, False) + table = self.catalog.get_table('test_db.rolling_test') + + # Use proper table API to create writer + write_builder = table.new_batch_write_builder() + blob_writer = write_builder.new_write() + + # Create data that should trigger rolling + large_content = 'X' * 1000 # Large string content + large_blob = b'B' * 5000 # Large blob data + + # Write multiple batches to test rolling + for i in range(10): # 10 batches + batch_data = pa.Table.from_pydict({ + 'id': [i * 10 + j for j in range(10)], + 'content': [f'{large_content}_{i}_{j}' for j in range(10)], + 'blob_data': [large_blob] * 10 + }, schema=pa_schema) + + for batch in batch_data.to_batches(): + blob_writer.write_arrow_batch(batch) + + # Verify total record count + # Record count is tracked internally by DataBlobWriter + + # Test prepare commit + commit_messages = blob_writer.prepare_commit() + self.assertIsInstance(commit_messages, list) + + # Verify we have committed files + self.assertGreater(len(commit_messages), 0) + + # Verify file metadata structure + for commit_msg in commit_messages: + for file_meta in commit_msg.new_files: + self.assertIsNotNone(file_meta.file_name) + self.assertGreater(file_meta.file_size, 0) + self.assertGreater(file_meta.row_count, 0) + + blob_writer.close() + + def test_blob_write_read_end_to_end(self): + """Test complete end-to-end blob functionality: write blob data and read it back to verify correctness.""" + from pypaimon import Schema + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('description', pa.string()), + ('blob_data', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.blob_write_read_e2e', schema, False) + table = self.catalog.get_table('test_db.blob_write_read_e2e') + + # Test data with various blob sizes and types + test_data = pa.Table.from_pydict({ + 'id': [1, 2, 3, 4, 5], + 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], + 'description': ['User 1', 'User 2', 'User 3', 'User 4', 'User 5'], + 'blob_data': [ + b'small_blob_1', + b'medium_blob_data_2_with_more_content', + b'large_blob_data_3_with_even_more_content_and_details', + b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here', # noqa: E501 + b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects' # noqa: E501 + ] + }, schema=pa_schema) + + # Write data using table API + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(test_data) + + # Commit the data + commit_messages = writer.prepare_commit() + self.assertGreater(len(commit_messages), 0) + + # Verify commit message structure + for commit_msg in commit_messages: + self.assertIsInstance(commit_msg.new_files, list) + self.assertGreater(len(commit_msg.new_files), 0) + + # Should have both normal and blob files + file_names = [f.file_name for f in commit_msg.new_files] + parquet_files = [f for f in file_names if f.endswith('.parquet')] + blob_files = [f for f in file_names if f.endswith('.blob')] + + self.assertGreater(len(parquet_files), 0, "Should have at least one parquet file") + self.assertGreater(len(blob_files), 0, "Should have at least one blob file") + + # Create commit and commit the data + commit = write_builder.new_commit() + commit.commit(commit_messages) + writer.close() + + # Read data back using table API + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + result = table_read.to_arrow(table_scan.plan().splits()) + + # Verify the data was read back correctly + self.assertEqual(result.num_rows, 5, "Should have 5 rows") + self.assertEqual(result.num_columns, 4, "Should have 4 columns") + + # Verify normal columns + self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID column should match") + self.assertEqual(result.column('name').to_pylist(), ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], "Name column should match") # noqa: E501 + self.assertEqual(result.column('description').to_pylist(), ['User 1', 'User 2', 'User 3', 'User 4', 'User 5'], "Description column should match") # noqa: E501 + + # Verify blob data correctness + blob_data = result.column('blob_data').to_pylist() + expected_blobs = [ + b'small_blob_1', + b'medium_blob_data_2_with_more_content', + b'large_blob_data_3_with_even_more_content_and_details', + b'very_large_blob_data_4_with_extensive_content_and_multiple_details_here', # noqa: E501 + b'extremely_large_blob_data_5_with_comprehensive_content_and_extensive_details_covering_multiple_aspects' # noqa: E501 + ] + + self.assertEqual(len(blob_data), 5, "Should have 5 blob records") + self.assertEqual(blob_data, expected_blobs, "Blob data should match exactly") + + # Verify individual blob sizes + for i, (actual_blob, expected_blob) in enumerate(zip(blob_data, expected_blobs)): + self.assertEqual(len(actual_blob), len(expected_blob), f"Blob {i+1} size should match") + self.assertEqual(actual_blob, expected_blob, f"Blob {i+1} content should match exactly") + + print(f"✅ End-to-end blob write/read test passed: wrote and read back {len(blob_data)} blob records correctly") # noqa: E501 + + def test_blob_write_read_large_data_end_to_end(self): + """Test end-to-end blob functionality with large blob data (1MB per blob).""" + from pypaimon import Schema + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('metadata', pa.string()), + ('large_blob', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.blob_large_write_read_e2e', schema, False) + table = self.catalog.get_table('test_db.blob_large_write_read_e2e') + + # Create large blob data (1MB per blob) + large_blob_size = 1024 * 1024 # 1MB + blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern + pattern_size = len(blob_pattern) + repetitions = large_blob_size // pattern_size + large_blob_data = blob_pattern * repetitions + + # Test data with large blobs + test_data = pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'metadata': ['Large blob 1', 'Large blob 2', 'Large blob 3'], + 'large_blob': [large_blob_data, large_blob_data, large_blob_data] + }, schema=pa_schema) + + # Write data + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(test_data) + + # Commit the data + commit_messages = writer.prepare_commit() + self.assertGreater(len(commit_messages), 0) + + # Verify commit message structure + for commit_msg in commit_messages: + self.assertIsInstance(commit_msg.new_files, list) + self.assertGreater(len(commit_msg.new_files), 0) + + # Should have both normal and blob files + file_names = [f.file_name for f in commit_msg.new_files] + parquet_files = [f for f in file_names if f.endswith('.parquet')] + blob_files = [f for f in file_names if f.endswith('.blob')] + + self.assertGreater(len(parquet_files), 0, "Should have at least one parquet file") + self.assertGreater(len(blob_files), 0, "Should have at least one blob file") + + commit = write_builder.new_commit() + commit.commit(commit_messages) + writer.close() + + # Read data back + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + result = table_read.to_arrow(table_scan.plan().splits()) + + # Verify the data + self.assertEqual(result.num_rows, 3, "Should have 3 rows") + self.assertEqual(result.num_columns, 3, "Should have 3 columns") + + # Verify normal columns + self.assertEqual(result.column('id').to_pylist(), [1, 2, 3], "ID column should match") + self.assertEqual(result.column('metadata').to_pylist(), ['Large blob 1', 'Large blob 2', 'Large blob 3'], "Metadata column should match") # noqa: E501 + + # Verify blob data integrity + blob_data = result.column('large_blob').to_pylist() + self.assertEqual(len(blob_data), 3, "Should have 3 blob records") + + for i, blob in enumerate(blob_data): + self.assertEqual(len(blob), len(large_blob_data), f"Blob {i+1} should be {large_blob_size} bytes") + self.assertEqual(blob, large_blob_data, f"Blob {i+1} content should match exactly") + print(f"✅ Verified large blob {i+1}: {len(blob)} bytes") + + print(f"✅ Large blob end-to-end test passed: wrote and read back {len(blob_data)} large blob records correctly") # noqa: E501 + + def test_blob_write_read_mixed_sizes_end_to_end(self): + """Test end-to-end blob functionality with mixed blob sizes.""" + from pypaimon import Schema + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('size_category', pa.string()), + ('blob_data', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.blob_mixed_sizes_write_read_e2e', schema, False) + table = self.catalog.get_table('test_db.blob_mixed_sizes_write_read_e2e') + + # Create blobs of different sizes + tiny_blob = b'tiny' + small_blob = b'small_blob_data' * 10 # ~140 bytes + medium_blob = b'medium_blob_data' * 100 # ~1.4KB + large_blob = b'large_blob_data' * 1000 # ~14KB + huge_blob = b'huge_blob_data' * 10000 # ~140KB + + test_data = pa.Table.from_pydict({ + 'id': [1, 2, 3, 4, 5], + 'size_category': ['tiny', 'small', 'medium', 'large', 'huge'], + 'blob_data': [tiny_blob, small_blob, medium_blob, large_blob, huge_blob] + }, schema=pa_schema) + + # Write data + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(test_data) + + # Commit + commit_messages = writer.prepare_commit() + self.assertGreater(len(commit_messages), 0) + + # Verify commit message structure + for commit_msg in commit_messages: + self.assertIsInstance(commit_msg.new_files, list) + self.assertGreater(len(commit_msg.new_files), 0) + + # Should have both normal and blob files + file_names = [f.file_name for f in commit_msg.new_files] + parquet_files = [f for f in file_names if f.endswith('.parquet')] + blob_files = [f for f in file_names if f.endswith('.blob')] + + self.assertGreater(len(parquet_files), 0, "Should have at least one parquet file") + self.assertGreater(len(blob_files), 0, "Should have at least one blob file") + + commit = write_builder.new_commit() + commit.commit(commit_messages) + writer.close() + + # Read back + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + result = table_read.to_arrow(table_scan.plan().splits()) + + # Verify + self.assertEqual(result.num_rows, 5, "Should have 5 rows") + self.assertEqual(result.num_columns, 3, "Should have 3 columns") + + # Verify normal columns + self.assertEqual(result.column('id').to_pylist(), [1, 2, 3, 4, 5], "ID column should match") + self.assertEqual(result.column('size_category').to_pylist(), ['tiny', 'small', 'medium', 'large', 'huge'], "Size category column should match") # noqa: E501 + + # Verify blob data + blob_data = result.column('blob_data').to_pylist() + expected_blobs = [tiny_blob, small_blob, medium_blob, large_blob, huge_blob] + + self.assertEqual(len(blob_data), 5, "Should have 5 blob records") + self.assertEqual(blob_data, expected_blobs, "Blob data should match exactly") + + # Verify sizes + sizes = [len(blob) for blob in blob_data] + expected_sizes = [len(blob) for blob in expected_blobs] + self.assertEqual(sizes, expected_sizes, "Blob sizes should match") + + # Verify individual blob content + for i, (actual_blob, expected_blob) in enumerate(zip(blob_data, expected_blobs)): + self.assertEqual(actual_blob, expected_blob, f"Blob {i+1} content should match exactly") + + print(f"✅ Mixed sizes end-to-end test passed: wrote and read back blobs ranging from {min(sizes)} to {max(sizes)} bytes") # noqa: E501 + + def test_blob_write_read_large_data_end_to_end_with_rolling(self): + """Test end-to-end blob functionality with large blob data (50MB per blob) and rolling behavior (40 blobs).""" + from pypaimon import Schema + + # Create schema with blob column + pa_schema = pa.schema([ + ('id', pa.int32()), + ('batch_id', pa.int32()), + ('metadata', pa.string()), + ('large_blob', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true' + } + ) + self.catalog.create_table('test_db.blob_large_rolling_e2e', schema, False) + table = self.catalog.get_table('test_db.blob_large_rolling_e2e') + + # Create large blob data (50MB per blob) + large_blob_size = 50 * 1024 * 1024 # 50MB + blob_pattern = b'LARGE_BLOB_PATTERN_' + b'X' * 1024 # ~1KB pattern + pattern_size = len(blob_pattern) + repetitions = large_blob_size // pattern_size + large_blob_data = blob_pattern * repetitions + + # Verify the blob size is exactly 50MB + actual_size = len(large_blob_data) + print(f"Created blob data: {actual_size:,} bytes ({actual_size / (1024*1024):.2f} MB)") + + # Write 40 batches of data (each with 1 blob of 50MB) + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + + # Write all 40 batches first + for batch_id in range(40): + # Create test data for this batch + test_data = pa.Table.from_pydict({ + 'id': [batch_id + 1], + 'batch_id': [batch_id], + 'metadata': [f'Large blob batch {batch_id + 1}'], + 'large_blob': [large_blob_data] + }, schema=pa_schema) + + # Write data + writer.write_arrow(test_data) + + # Print progress every 10 batches + if (batch_id + 1) % 10 == 0: + print(f"✅ Written batch {batch_id + 1}/40: {len(large_blob_data):,} bytes") + + print("✅ Successfully wrote all 40 batches of 50MB blobs") + + # Commit all data at once + commit_messages = writer.prepare_commit() + self.assertGreater(len(commit_messages), 0) + + # Verify commit message structure + for commit_msg in commit_messages: + self.assertIsInstance(commit_msg.new_files, list) + self.assertGreater(len(commit_msg.new_files), 0) + + # Should have both normal and blob files + file_names = [f.file_name for f in commit_msg.new_files] + parquet_files = [f for f in file_names if f.endswith('.parquet')] + blob_files = [f for f in file_names if f.endswith('.blob')] + + self.assertGreater(len(parquet_files), 0, "Should have at least one parquet file") + self.assertGreater(len(blob_files), 0, "Should have at least one blob file") + + # Commit the data + commit = write_builder.new_commit() + commit.commit(commit_messages) + writer.close() + + print(f"✅ Successfully committed {len(commit_messages)} commit messages with 40 batches of 50MB blobs") + + # Read data back + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + result = table_read.to_arrow(table_scan.plan().splits()) + + # Verify the data + self.assertEqual(result.num_rows, 40, "Should have 40 rows") + self.assertEqual(result.num_columns, 4, "Should have 4 columns") + + # Verify normal columns + expected_ids = list(range(1, 41)) + expected_batch_ids = list(range(40)) + expected_metadata = [f'Large blob batch {i}' for i in range(1, 41)] + + self.assertEqual(result.column('id').to_pylist(), expected_ids, "ID column should match") + self.assertEqual(result.column('batch_id').to_pylist(), expected_batch_ids, "Batch ID column should match") # noqa: E501 + self.assertEqual(result.column('metadata').to_pylist(), expected_metadata, "Metadata column should match") # noqa: E501 + + # Verify blob data integrity + blob_data = result.column('large_blob').to_pylist() + self.assertEqual(len(blob_data), 40, "Should have 40 blob records") + + # Verify each blob + for i, blob in enumerate(blob_data): + self.assertEqual(len(blob), len(large_blob_data), f"Blob {i+1} should be {large_blob_size:,} bytes") + self.assertEqual(blob, large_blob_data, f"Blob {i+1} content should match exactly") + + # Print progress every 10 blobs + if (i + 1) % 10 == 0: + print(f"✅ Verified blob {i+1}/40: {len(blob):,} bytes") + + # Verify total data size + total_blob_size = sum(len(blob) for blob in blob_data) + expected_total_size = 40 * len(large_blob_data) + self.assertEqual(total_blob_size, expected_total_size, + f"Total blob size should be {expected_total_size:,} bytes") + + print("✅ Large blob rolling end-to-end test passed:") + print(" - Wrote and read back 40 blobs of 50MB each") + print(f" - Total data size: {total_blob_size:,} bytes ({total_blob_size / (1024*1024*1024):.2f} GB)") # noqa: E501 + print(" - All blob content verified as correct") + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 4e5b4d723e..97804bbf6b 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -21,6 +21,7 @@ import uuid from pathlib import Path from typing import List +from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate_builder import PredicateBuilder from pypaimon.manifest.manifest_file_manager import ManifestFileManager from pypaimon.manifest.manifest_list_manager import ManifestListManager @@ -134,7 +135,7 @@ class FileStoreCommit: new_snapshot_id = self._generate_snapshot_id() # Check if row tracking is enabled - row_tracking_enabled = self.table.options.get('row-tracking.enabled', 'false').lower() == 'true' + row_tracking_enabled = self.table.options.get(CoreOptions.ROW_TRACKING_ENABLED, 'false').lower() == 'true' # Apply row tracking logic if enabled next_row_id = None diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index 841fef3a65..35b4a7d980 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -21,6 +21,7 @@ import pyarrow as pa from pypaimon.write.commit_message import CommitMessage from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter +from pypaimon.write.writer.data_blob_writer import DataBlobWriter from pypaimon.write.writer.data_writer import DataWriter from pypaimon.write.writer.key_value_data_writer import KeyValueDataWriter @@ -44,7 +45,15 @@ class FileStoreWrite: writer.write(data) def _create_data_writer(self, partition: Tuple, bucket: int) -> DataWriter: - if self.table.is_primary_key_table: + # Check if table has blob columns + if self._has_blob_columns(): + return DataBlobWriter( + table=self.table, + partition=partition, + bucket=bucket, + max_seq_number=self.max_seq_numbers.get((partition, bucket), 1), + ) + elif self.table.is_primary_key_table: return KeyValueDataWriter( table=self.table, partition=partition, @@ -60,6 +69,17 @@ class FileStoreWrite: write_cols=self.write_cols ) + def _has_blob_columns(self) -> bool: + """Check if the table schema contains blob columns.""" + for field in self.table.table_schema.fields: + # Check if field type is blob + if hasattr(field.type, 'type') and field.type.type == 'BLOB': + return True + # Alternative: check for specific blob type class + elif hasattr(field.type, '__class__') and 'blob' in field.type.__class__.__name__.lower(): + return True + return False + def prepare_commit(self) -> List[CommitMessage]: commit_messages = [] for (partition, bucket), writer in self.data_writers.items(): diff --git a/paimon-python/pypaimon/common/core_options.py b/paimon-python/pypaimon/write/writer/blob_writer.py similarity index 50% copy from paimon-python/pypaimon/common/core_options.py copy to paimon-python/pypaimon/write/writer/blob_writer.py index 8ab26fe062..ff153da843 100644 --- a/paimon-python/pypaimon/common/core_options.py +++ b/paimon-python/pypaimon/write/writer/blob_writer.py @@ -16,36 +16,31 @@ # limitations under the License. ################################################################################ -from enum import Enum - - -class CoreOptions(str, Enum): - """Core options for paimon.""" - - def __str__(self): - return self.value - - # Basic options - AUTO_CREATE = "auto-create" - PATH = "path" - TYPE = "type" - BRANCH = "branch" - BUCKET = "bucket" - BUCKET_KEY = "bucket-key" - WAREHOUSE = "warehouse" - # File format options - FILE_FORMAT = "file.format" - FILE_FORMAT_ORC = "orc" - FILE_FORMAT_AVRO = "avro" - FILE_FORMAT_PARQUET = "parquet" - FILE_FORMAT_BLOB = "blob" - FILE_COMPRESSION = "file.compression" - FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level" - FILE_FORMAT_PER_LEVEL = "file.format.per.level" - FILE_BLOCK_SIZE = "file.block-size" - FILE_BLOB_AS_DESCRIPTOR = "blob-as-descriptor" - # Scan options - SCAN_FALLBACK_BRANCH = "scan.fallback-branch" - INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp" - # Commit options - COMMIT_USER_PREFIX = "commit.user-prefix" +import logging +from typing import Tuple + +from pypaimon.common.core_options import CoreOptions +from pypaimon.write.writer.append_only_data_writer import AppendOnlyDataWriter + +logger = logging.getLogger(__name__) + + +class BlobWriter(AppendOnlyDataWriter): + + def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, blob_column: str): + super().__init__(table, partition, bucket, max_seq_number, [blob_column]) + + # Override file format to "blob" + self.file_format = CoreOptions.FILE_FORMAT_BLOB + + logger.info("Initialized BlobWriter with blob file format") + + @staticmethod + def _get_column_stats(record_batch, column_name: str): + column_array = record_batch.column(column_name) + # For blob data, don't generate min/max values + return { + "min_values": None, + "max_values": None, + "null_counts": column_array.null_count, + } diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py b/paimon-python/pypaimon/write/writer/data_blob_writer.py new file mode 100644 index 0000000000..e34b9a5701 --- /dev/null +++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py @@ -0,0 +1,321 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import logging +import uuid +from datetime import datetime +from pathlib import Path +from typing import List, Optional, Tuple + +import pyarrow as pa + +from pypaimon.common.core_options import CoreOptions +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.table.row.generic_row import GenericRow +from pypaimon.write.writer.data_writer import DataWriter + +logger = logging.getLogger(__name__) + + +class DataBlobWriter(DataWriter): + """ + A rolling file writer that handles both normal data and blob data. This writer creates separate + files for normal columns and blob columns, managing their lifecycle independently. + + For example, given a table schema with normal columns (id INT, name STRING) and a blob column + (data BLOB), this writer will create separate files for (id, name) and (data). + + Key features: + - Blob data can roll independently when normal data doesn't need rolling + - When normal data rolls, blob data MUST also be closed (Java behavior) + - Blob data uses more aggressive rolling (smaller target size) to prevent memory issues + - One normal data file may correspond to multiple blob data files + - Blob data is written immediately to disk to prevent memory corruption + - Blob file metadata is stored as separate DataFileMeta objects after normal file metadata + + Rolling behavior: + - Normal data rolls: Both normal and blob writers are closed together, blob metadata added after normal metadata + - Blob data rolls independently: Only blob writer is closed, blob metadata is cached until normal data rolls + + Metadata organization: + - Normal file metadata is added first to committed_files + - Blob file metadata is added after normal file metadata in committed_files + - When blob rolls independently, metadata is cached until normal data rolls + - Result: [normal_meta, blob_meta1, blob_meta2, blob_meta3, ...] + + Example file organization: + committed_files = [ + normal_file1_meta, # f1.parquet metadata + blob_file1_meta, # b1.blob metadata + blob_file2_meta, # b2.blob metadata + blob_file3_meta, # b3.blob metadata + normal_file2_meta, # f1-2.parquet metadata + blob_file4_meta, # b4.blob metadata + blob_file5_meta, # b5.blob metadata + ] + + This matches the Java RollingBlobFileWriter behavior exactly. + """ + + # Constant for checking rolling condition periodically + CHECK_ROLLING_RECORD_CNT = 1000 + + def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int): + super().__init__(table, partition, bucket, max_seq_number) + + # Determine blob column from table schema + self.blob_column_name = self._get_blob_columns_from_schema() + + # Split schema into normal and blob columns + all_column_names = [field.name for field in self.table.table_schema.fields] + self.normal_column_names = [col for col in all_column_names if col != self.blob_column_name] + self.write_cols = self.normal_column_names + + # State management for blob writer + self.record_count = 0 + self.closed = False + + # Track pending data for normal data only + self.pending_normal_data: Optional[pa.Table] = None + + # Initialize blob writer with blob column name + from pypaimon.write.writer.blob_writer import BlobWriter + self.blob_writer = BlobWriter( + table=self.table, + partition=self.partition, + bucket=self.bucket, + max_seq_number=max_seq_number, + blob_column=self.blob_column_name + ) + + logger.info(f"Initialized DataBlobWriter with blob column: {self.blob_column_name}") + + def _get_blob_columns_from_schema(self) -> str: + blob_columns = [] + for field in self.table.table_schema.fields: + type_str = str(field.type).lower() + if 'blob' in type_str: + blob_columns.append(field.name) + + # Validate blob column count (matching Java constraint) + if len(blob_columns) == 0: + raise ValueError("No blob field found in table schema.") + elif len(blob_columns) > 1: + raise ValueError("Limit exactly one blob field in one paimon table yet.") + + return blob_columns[0] # Return single blob column name + + def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: + normal_data, _ = self._split_data(data) + return normal_data + + def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table: + return self._merge_normal_data(existing_data, new_data) + + def write(self, data: pa.RecordBatch): + try: + # Split data into normal and blob parts + normal_data, blob_data = self._split_data(data) + + # Process and accumulate normal data + processed_normal = self._process_normal_data(normal_data) + if self.pending_normal_data is None: + self.pending_normal_data = processed_normal + else: + self.pending_normal_data = self._merge_normal_data(self.pending_normal_data, processed_normal) + + # Write blob data directly to blob writer (handles its own rolling) + if blob_data is not None and blob_data.num_rows > 0: + # Write blob data directly to blob writer + self.blob_writer.write(blob_data) + + self.record_count += data.num_rows + + # Check if normal data rolling is needed + if self._should_roll_normal(): + # When normal data rolls, close both writers and fetch blob metadata + self._close_current_writers() + + except Exception as e: + logger.error("Exception occurs when writing data. Cleaning up.", exc_info=e) + self.abort() + raise e + + def prepare_commit(self) -> List[DataFileMeta]: + # Close any remaining data + self._close_current_writers() + + return self.committed_files.copy() + + def close(self): + if self.closed: + return + + try: + if self.pending_normal_data is not None and self.pending_normal_data.num_rows > 0: + self._close_current_writers() + except Exception as e: + logger.error("Exception occurs when closing writer. Cleaning up.", exc_info=e) + self.abort() + finally: + self.closed = True + self.pending_normal_data = None + + def abort(self): + """Abort all writers and clean up resources.""" + self.blob_writer.abort() + self.pending_normal_data = None + self.committed_files.clear() + + def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, pa.RecordBatch]: + """Split data into normal and blob parts based on column names.""" + # Use the pre-computed column names + normal_columns = self.normal_column_names + blob_columns = [self.blob_column_name] # Single blob column + + # Create projected batches + normal_data = data.select(normal_columns) if normal_columns else None + blob_data = data.select(blob_columns) if blob_columns else None + + return normal_data, blob_data + + def _process_normal_data(self, data: pa.RecordBatch) -> pa.Table: + """Process normal data (similar to base DataWriter).""" + if data is None or data.num_rows == 0: + return pa.Table.from_batches([]) + return pa.Table.from_batches([data]) + + def _merge_normal_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table: + return pa.concat_tables([existing_data, new_data]) + + def _should_roll_normal(self) -> bool: + if self.pending_normal_data is None: + return False + + # Check rolling condition periodically (every CHECK_ROLLING_RECORD_CNT records) + if self.record_count % self.CHECK_ROLLING_RECORD_CNT != 0: + return False + + # Check if normal data exceeds target size + current_size = self.pending_normal_data.nbytes + return current_size > self.target_file_size + + def _close_current_writers(self): + """Close both normal and blob writers and add blob metadata after normal metadata (Java behavior).""" + if self.pending_normal_data is None or self.pending_normal_data.num_rows == 0: + return + + # Close normal writer and get metadata + normal_meta = self._write_normal_data_to_file(self.pending_normal_data) + + # Fetch blob metadata from blob writer + blob_metas = self.blob_writer.prepare_commit() + + # Validate consistency between normal and blob files (Java behavior) + self._validate_consistency(normal_meta, blob_metas) + + # Add normal file metadata first + self.committed_files.append(normal_meta) + + # Add blob file metadata after normal metadata + self.committed_files.extend(blob_metas) + + # Reset pending data + self.pending_normal_data = None + + logger.info(f"Closed both writers - normal: {normal_meta.file_name}, " + f"added {len(blob_metas)} blob file metadata after normal metadata") + + def _write_normal_data_to_file(self, data: pa.Table) -> DataFileMeta: + if data.num_rows == 0: + return None + + file_name = f"data-{uuid.uuid4()}-0.{self.file_format}" + file_path = self._generate_file_path(file_name) + + # Write file based on format + if self.file_format == CoreOptions.FILE_FORMAT_PARQUET: + self.file_io.write_parquet(file_path, data, compression=self.compression) + elif self.file_format == CoreOptions.FILE_FORMAT_ORC: + self.file_io.write_orc(file_path, data, compression=self.compression) + elif self.file_format == CoreOptions.FILE_FORMAT_AVRO: + self.file_io.write_avro(file_path, data) + else: + raise ValueError(f"Unsupported file format: {self.file_format}") + + # Generate metadata + return self._create_data_file_meta(file_name, file_path, data) + + def _create_data_file_meta(self, file_name: str, file_path: Path, data: pa.Table) -> DataFileMeta: + # Column stats (only for normal columns) + column_stats = { + field.name: self._get_column_stats(data, field.name) + for field in self.table.table_schema.fields + if field.name != self.blob_column_name + } + + # Get normal fields only + normal_fields = [field for field in self.table.table_schema.fields + if field.name != self.blob_column_name] + + min_value_stats = [column_stats[field.name]['min_values'] for field in normal_fields] + max_value_stats = [column_stats[field.name]['max_values'] for field in normal_fields] + value_null_counts = [column_stats[field.name]['null_counts'] for field in normal_fields] + + self.sequence_generator.start = self.sequence_generator.current + + return DataFileMeta( + file_name=file_name, + file_size=self.file_io.get_file_size(file_path), + row_count=data.num_rows, + min_key=GenericRow([], []), + max_key=GenericRow([], []), + key_stats=SimpleStats( + GenericRow([], []), + GenericRow([], []), + []), + value_stats=SimpleStats( + GenericRow(min_value_stats, normal_fields), + GenericRow(max_value_stats, normal_fields), + value_null_counts), + min_sequence_number=-1, + max_sequence_number=-1, + schema_id=self.table.table_schema.id, + level=0, + extra_files=[], + creation_time=datetime.now(), + delete_row_count=0, + file_source="APPEND", + value_stats_cols=self.normal_column_names, + file_path=str(file_path), + write_cols=self.write_cols) + + def _validate_consistency(self, normal_meta: DataFileMeta, blob_metas: List[DataFileMeta]): + if normal_meta is None: + return + + normal_row_count = normal_meta.row_count + blob_row_count = sum(meta.row_count for meta in blob_metas) + + if normal_row_count != blob_row_count: + raise RuntimeError( + f"This is a bug: The row count of main file and blob files does not match. " + f"Main file: {normal_meta.file_name} (row count: {normal_row_count}), " + f"blob files: {[meta.file_name for meta in blob_metas]} (total row count: {blob_row_count})" + ) diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 0a063fb2f4..502d196ae6 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -61,14 +61,21 @@ class DataWriter(ABC): self.blob_as_descriptor = options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False) def write(self, data: pa.RecordBatch): - processed_data = self._process_data(data) + try: + processed_data = self._process_data(data) - if self.pending_data is None: - self.pending_data = processed_data - else: - self.pending_data = self._merge_data(self.pending_data, processed_data) + if self.pending_data is None: + self.pending_data = processed_data + else: + self.pending_data = self._merge_data(self.pending_data, processed_data) - self._check_and_roll_if_needed() + self._check_and_roll_if_needed() + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.warning("Exception occurs when writing data. Cleaning up.", exc_info=e) + self.abort() + raise e def prepare_commit(self) -> List[DataFileMeta]: if self.pending_data is not None and self.pending_data.num_rows > 0: @@ -78,6 +85,36 @@ class DataWriter(ABC): return self.committed_files.copy() def close(self): + try: + if self.pending_data is not None and self.pending_data.num_rows > 0: + self._write_data_to_file(self.pending_data) + except Exception as e: + import logging + logger = logging.getLogger(__name__) + logger.warning("Exception occurs when closing writer. Cleaning up.", exc_info=e) + self.abort() + raise e + finally: + self.pending_data = None + # Note: Don't clear committed_files in close() - they should be returned by prepare_commit() + + def abort(self): + """ + Abort all writers and clean up resources. This method should be called when an error occurs + during writing. It deletes any files that were written and cleans up resources. + """ + # Delete any files that were written + for file_meta in self.committed_files: + try: + if file_meta.file_path: + self.file_io.delete_quietly(file_meta.file_path) + except Exception as e: + # Log but don't raise - we want to clean up as much as possible + import logging + logger = logging.getLogger(__name__) + logger.warning(f"Failed to delete file {file_meta.file_path} during abort: {e}") + + # Clean up resources self.pending_data = None self.committed_files.clear()
