This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 0e05b7eb15b3d9947ba673f4b5f63702b6885a53 Author: umi <[email protected]> AuthorDate: Wed Oct 15 10:54:23 2025 +0800 [Python] Enable field merge read in row-tracking table (#6399) --- .../pypaimon/manifest/manifest_file_manager.py | 16 +- .../pypaimon/manifest/schema/data_file_meta.py | 61 ++- .../pypaimon/manifest/schema/manifest_entry.py | 20 + .../read/reader/data_evolution_merge_reader.py | 85 ++++ ..._record_reader.py => data_file_batch_reader.py} | 0 paimon-python/pypaimon/read/reader/field_bunch.py | 120 +++++ paimon-python/pypaimon/read/split_read.py | 196 ++++++++- paimon-python/pypaimon/read/table_read.py | 10 +- paimon-python/pypaimon/read/table_scan.py | 106 +++++ paimon-python/pypaimon/snapshot/snapshot.py | 1 + .../pypaimon/tests/data_evolution_test.py | 483 +++++++++++++++++++++ .../pypaimon/tests/file_store_commit_test.py | 24 +- .../pypaimon/tests/py36/data_evolution_test.py | 483 +++++++++++++++++++++ .../pypaimon/tests/py36/rest_ao_read_write_test.py | 6 +- paimon-python/pypaimon/tests/reader_base_test.py | 6 +- .../pypaimon/tests/rest/rest_read_write_test.py | 2 +- paimon-python/pypaimon/write/batch_table_write.py | 19 +- paimon-python/pypaimon/write/file_store_commit.py | 75 ++++ paimon-python/pypaimon/write/file_store_write.py | 2 + paimon-python/pypaimon/write/writer/data_writer.py | 22 +- 20 files changed, 1708 insertions(+), 29 deletions(-) diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index f4b0ab0be3..e3c9601cf4 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -60,9 +60,13 @@ class ManifestFileManager: null_counts=key_dict['_NULL_COUNTS'], ) value_dict = dict(file_dict['_VALUE_STATS']) - if file_dict.get('_VALUE_STATS_COLS') is None: - fields = self.table.table_schema.fields - elif not file_dict.get('_VALUE_STATS_COLS'): + if file_dict['_VALUE_STATS_COLS'] is None: + if file_dict['_WRITE_COLS'] is None: + fields = self.table.table_schema.fields + else: + read_fields = file_dict['_WRITE_COLS'] + fields = [self.table.field_dict[col] for col in read_fields] + elif not file_dict['_VALUE_STATS_COLS']: fields = [] else: fields = [self.table.field_dict[col] for col in file_dict['_VALUE_STATS_COLS']] @@ -89,6 +93,9 @@ class ManifestFileManager: embedded_index=file_dict['_EMBEDDED_FILE_INDEX'], file_source=file_dict['_FILE_SOURCE'], value_stats_cols=file_dict.get('_VALUE_STATS_COLS'), + external_path=file_dict.get('_EXTERNAL_PATH'), + first_row_id=file_dict['_FIRST_ROW_ID'], + write_cols=file_dict['_WRITE_COLS'], ) entry = ManifestEntry( kind=record['_KIND'], @@ -137,6 +144,9 @@ class ManifestFileManager: "_EMBEDDED_FILE_INDEX": entry.file.embedded_index, "_FILE_SOURCE": entry.file.file_source, "_VALUE_STATS_COLS": entry.file.value_stats_cols, + "_EXTERNAL_PATH": entry.file.external_path, + "_FIRST_ROW_ID": entry.file.first_row_id, + "_WRITE_COLS": entry.file.write_cols, } } avro_records.append(avro_record) diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py b/paimon-python/pypaimon/manifest/schema/data_file_meta.py index 82dfb66918..1d1bcb56fb 100644 --- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py +++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py @@ -47,6 +47,8 @@ class DataFileMeta: file_source: Optional[str] = None value_stats_cols: Optional[List[str]] = None external_path: Optional[str] = None + first_row_id: Optional[int] = None + write_cols: Optional[List[str]] = None # not a schema field, just for internal usage file_path: str = None @@ -59,6 +61,58 @@ class DataFileMeta: path_builder = path_builder / ("bucket-" + str(bucket)) / self.file_name self.file_path = str(path_builder) + def assign_first_row_id(self, first_row_id: int) -> 'DataFileMeta': + """Create a new DataFileMeta with the assigned first_row_id.""" + return DataFileMeta( + file_name=self.file_name, + file_size=self.file_size, + row_count=self.row_count, + min_key=self.min_key, + max_key=self.max_key, + key_stats=self.key_stats, + value_stats=self.value_stats, + min_sequence_number=self.min_sequence_number, + max_sequence_number=self.max_sequence_number, + schema_id=self.schema_id, + level=self.level, + extra_files=self.extra_files, + creation_time=self.creation_time, + delete_row_count=self.delete_row_count, + embedded_index=self.embedded_index, + file_source=self.file_source, + value_stats_cols=self.value_stats_cols, + external_path=self.external_path, + first_row_id=first_row_id, + write_cols=self.write_cols, + file_path=self.file_path + ) + + def assign_sequence_number(self, min_sequence_number: int, max_sequence_number: int) -> 'DataFileMeta': + """Create a new DataFileMeta with the assigned sequence numbers.""" + return DataFileMeta( + file_name=self.file_name, + file_size=self.file_size, + row_count=self.row_count, + min_key=self.min_key, + max_key=self.max_key, + key_stats=self.key_stats, + value_stats=self.value_stats, + min_sequence_number=min_sequence_number, + max_sequence_number=max_sequence_number, + schema_id=self.schema_id, + level=self.level, + extra_files=self.extra_files, + creation_time=self.creation_time, + delete_row_count=self.delete_row_count, + embedded_index=self.embedded_index, + file_source=self.file_source, + value_stats_cols=self.value_stats_cols, + external_path=self.external_path, + first_row_id=self.first_row_id, + write_cols=self.write_cols, + file_path=self.file_path + ) + DATA_FILE_META_SCHEMA = { "type": "record", @@ -83,9 +137,14 @@ DATA_FILE_META_SCHEMA = { "default": None}, {"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default": None}, {"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default": None}, - {"name": "_FILE_SOURCE", "type": ["null", "int"], "default": None}, + {"name": "_FILE_SOURCE", "type": ["null", "string"], "default": None}, {"name": "_VALUE_STATS_COLS", "type": ["null", {"type": "array", "items": "string"}], "default": None}, + {"name": "_EXTERNAL_PATH", "type": ["null", "string"], "default": None}, + {"name": "_FIRST_ROW_ID", "type": ["null", "long"], "default": None}, + {"name": "_WRITE_COLS", + "type": ["null", {"type": "array", "items": "string"}], + "default": None}, ] } diff --git a/paimon-python/pypaimon/manifest/schema/manifest_entry.py b/paimon-python/pypaimon/manifest/schema/manifest_entry.py index 9a02341175..9608fbbd37 100644 --- a/paimon-python/pypaimon/manifest/schema/manifest_entry.py +++ b/paimon-python/pypaimon/manifest/schema/manifest_entry.py @@ -31,6 +31,26 @@ class ManifestEntry: total_buckets: int file: DataFileMeta + def assign_first_row_id(self, first_row_id: int) -> 'ManifestEntry': + """Create a new ManifestEntry with the assigned first_row_id.""" + return ManifestEntry( + kind=self.kind, + partition=self.partition, + bucket=self.bucket, + total_buckets=self.total_buckets, + file=self.file.assign_first_row_id(first_row_id) + ) + + def assign_sequence_number(self, min_sequence_number: int, max_sequence_number: int) -> 'ManifestEntry': + """Create a new ManifestEntry with the assigned sequence numbers.""" + return ManifestEntry( + kind=self.kind, + partition=self.partition, + bucket=self.bucket, + total_buckets=self.total_buckets, + file=self.file.assign_sequence_number(min_sequence_number, max_sequence_number) + ) + MANIFEST_ENTRY_SCHEMA = { "type": "record", diff --git a/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py b/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py new file mode 100644 index 0000000000..43bf926862 --- /dev/null +++ b/paimon-python/pypaimon/read/reader/data_evolution_merge_reader.py @@ -0,0 +1,85 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from typing import List, Optional + +import pyarrow as pa +from pyarrow import RecordBatch + +from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader + + +class DataEvolutionMergeReader(RecordBatchReader): + """ + This is a union reader which contains multiple inner readers, Each reader is responsible for reading one file. + + This reader, assembling multiple reader into one big and great reader, will merge the batches from all readers. + + For example, if rowOffsets is {0, 2, 0, 1, 2, 1} and fieldOffsets is {0, 0, 1, 1, 1, 0}, it means: + - The first field comes from batch0, and it is at offset 0 in batch0. + - The second field comes from batch2, and it is at offset 0 in batch2. + - The third field comes from batch0, and it is at offset 1 in batch0. + - The fourth field comes from batch1, and it is at offset 1 in batch1. + - The fifth field comes from batch2, and it is at offset 1 in batch2. + - The sixth field comes from batch1, and it is at offset 0 in batch1. + """ + + def __init__(self, row_offsets: List[int], field_offsets: List[int], readers: List[Optional[RecordBatchReader]]): + if row_offsets is None: + raise ValueError("Row offsets must not be null") + if field_offsets is None: + raise ValueError("Field offsets must not be null") + if len(row_offsets) != len(field_offsets): + raise ValueError("Row offsets and field offsets must have the same length") + if not row_offsets: + raise ValueError("Row offsets must not be empty") + if not readers or len(readers) < 1: + raise ValueError("Readers should be more than 0") + self.row_offsets = row_offsets + self.field_offsets = field_offsets + self.readers = readers + + def read_arrow_batch(self) -> Optional[RecordBatch]: + batches: List[Optional[RecordBatch]] = [None] * len(self.readers) + for i, reader in enumerate(self.readers): + if reader is not None: + batch = reader.read_arrow_batch() + if batch is None: + # all readers are aligned, as long as one returns null, the others will also have no data + return None + batches[i] = batch + # Assemble record batches from batches based on row_offsets and field_offsets + columns = [] + names = [] + for i in range(len(self.row_offsets)): + batch_index = self.row_offsets[i] + field_index = self.field_offsets[i] + if batches[batch_index] is not None: + column = batches[batch_index].column(field_index) + columns.append(column) + names.append(batches[batch_index].schema.names[field_index]) + if columns: + return pa.RecordBatch.from_arrays(columns, names) + return None + + def close(self) -> None: + try: + for reader in self.readers: + if reader is not None: + reader.close() + except Exception as e: + raise IOError("Failed to close inner readers") from e diff --git a/paimon-python/pypaimon/read/reader/data_file_record_reader.py b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py similarity index 100% rename from paimon-python/pypaimon/read/reader/data_file_record_reader.py rename to paimon-python/pypaimon/read/reader/data_file_batch_reader.py diff --git a/paimon-python/pypaimon/read/reader/field_bunch.py b/paimon-python/pypaimon/read/reader/field_bunch.py new file mode 100644 index 0000000000..4ba82bd80e --- /dev/null +++ b/paimon-python/pypaimon/read/reader/field_bunch.py @@ -0,0 +1,120 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +""" +FieldBunch classes for organizing files by field in data evolution. + +These classes help organize DataFileMeta objects into groups based on their field content, +supporting both regular data files and blob files. +""" +from abc import ABC +from typing import List +from pypaimon.manifest.schema.data_file_meta import DataFileMeta + + +class FieldBunch(ABC): + """Interface for files organized by field.""" + + def row_count(self) -> int: + """Return the total row count for this bunch.""" + ... + + def files(self) -> List[DataFileMeta]: + """Return the list of files in this bunch.""" + ... + + +class DataBunch(FieldBunch): + """Files for a single data file.""" + + def __init__(self, data_file: DataFileMeta): + self.data_file = data_file + + def row_count(self) -> int: + return self.data_file.row_count + + def files(self) -> List[DataFileMeta]: + return [self.data_file] + + +class BlobBunch(FieldBunch): + """Files for partial field (blob files).""" + + def __init__(self, expected_row_count: int): + self._files: List[DataFileMeta] = [] + self.expected_row_count = expected_row_count + self.latest_first_row_id = -1 + self.expected_next_first_row_id = -1 + self.latest_max_sequence_number = -1 + self._row_count = 0 + + def add(self, file: DataFileMeta) -> None: + """Add a blob file to this bunch.""" + if not self._is_blob_file(file.file_name): + raise ValueError("Only blob file can be added to a blob bunch.") + + if file.first_row_id == self.latest_first_row_id: + if file.max_sequence_number >= self.latest_max_sequence_number: + raise ValueError( + "Blob file with same first row id should have decreasing sequence number." + ) + return + + if self._files: + first_row_id = file.first_row_id + if first_row_id < self.expected_next_first_row_id: + if file.max_sequence_number >= self.latest_max_sequence_number: + raise ValueError( + "Blob file with overlapping row id should have decreasing sequence number." + ) + return + elif first_row_id > self.expected_next_first_row_id: + raise ValueError( + f"Blob file first row id should be continuous, expect " + f"{self.expected_next_first_row_id} but got {first_row_id}" + ) + + if file.schema_id != self._files[0].schema_id: + raise ValueError( + "All files in a blob bunch should have the same schema id." + ) + if file.write_cols != self._files[0].write_cols: + raise ValueError( + "All files in a blob bunch should have the same write columns." + ) + + self._files.append(file) + self._row_count += file.row_count + if self._row_count > self.expected_row_count: + raise ValueError( + f"Blob files row count exceed the expect {self.expected_row_count}" + ) + + self.latest_max_sequence_number = file.max_sequence_number + self.latest_first_row_id = file.first_row_id + self.expected_next_first_row_id = self.latest_first_row_id + file.row_count + + def row_count(self) -> int: + return self._row_count + + def files(self) -> List[DataFileMeta]: + return self._files + + @staticmethod + def _is_blob_file(file_name: str) -> bool: + """Check if a file is a blob file based on its extension.""" + return file_name.endswith('.blob') diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 7674db45f0..81ebdd86f8 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -23,11 +23,14 @@ from typing import List, Optional, Tuple from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate +from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.read.interval_partition import IntervalPartition, SortedRun from pypaimon.read.partition_info import PartitionInfo from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader, ShardBatchReader from pypaimon.read.reader.concat_record_reader import ConcatRecordReader -from pypaimon.read.reader.data_file_record_reader import DataFileBatchReader +from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader +from pypaimon.read.reader.data_evolution_merge_reader import DataEvolutionMergeReader +from pypaimon.read.reader.field_bunch import FieldBunch, DataBunch, BlobBunch from pypaimon.read.reader.drop_delete_reader import DropDeleteRecordReader from pypaimon.read.reader.empty_record_reader import EmptyFileRecordReader from pypaimon.read.reader.filter_record_reader import FilterRecordReader @@ -298,3 +301,194 @@ class MergeFileSplitRead(SplitRead): def _get_all_data_fields(self): return self._create_key_value_fields(self.table.fields) + + +class DataEvolutionSplitRead(SplitRead): + + def create_reader(self) -> RecordReader: + files = self.split.files + suppliers = [] + + # Split files by row ID using the same logic as Java DataEvolutionSplitGenerator.split + split_by_row_id = self._split_by_row_id(files) + + for need_merge_files in split_by_row_id: + if len(need_merge_files) == 1 or not self.read_fields: + # No need to merge fields, just create a single file reader + suppliers.append( + lambda f=need_merge_files[0]: self._create_file_reader(f) + ) + else: + suppliers.append( + lambda files=need_merge_files: self._create_union_reader(files) + ) + + return ConcatBatchReader(suppliers) + + def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: + """Split files by firstRowId for data evolution.""" + + # Sort files by firstRowId and then by maxSequenceNumber + def sort_key(file: DataFileMeta) -> tuple: + first_row_id = file.first_row_id if file.first_row_id is not None else float('-inf') + is_blob = 1 if self._is_blob_file(file.file_name) else 0 + max_seq = file.max_sequence_number + return (first_row_id, is_blob, -max_seq) + + sorted_files = sorted(files, key=sort_key) + + # Split files by firstRowId + split_by_row_id = [] + last_row_id = -1 + check_row_id_start = 0 + current_split = [] + + for file in sorted_files: + first_row_id = file.first_row_id + if first_row_id is None: + split_by_row_id.append([file]) + continue + + if not self._is_blob_file(file.file_name) and first_row_id != last_row_id: + if current_split: + split_by_row_id.append(current_split) + if first_row_id < check_row_id_start: + raise ValueError( + f"There are overlapping files in the split: {files}, " + f"the wrong file is: {file}" + ) + current_split = [] + last_row_id = first_row_id + check_row_id_start = first_row_id + file.row_count + current_split.append(file) + + if current_split: + split_by_row_id.append(current_split) + + return split_by_row_id + + def _create_union_reader(self, need_merge_files: List[DataFileMeta]) -> RecordReader: + """Create a DataEvolutionFileReader for merging multiple files.""" + # Split field bunches + fields_files = self._split_field_bunches(need_merge_files) + + # Validate row counts and first row IDs + row_count = fields_files[0].row_count() + first_row_id = fields_files[0].files()[0].first_row_id + + for bunch in fields_files: + if bunch.row_count() != row_count: + raise ValueError("All files in a field merge split should have the same row count.") + if bunch.files()[0].first_row_id != first_row_id: + raise ValueError( + "All files in a field merge split should have the same first row id and could not be null." + ) + + # Create the union reader + all_read_fields = self.read_fields + file_record_readers = [None] * len(fields_files) + read_field_index = [field.id for field in all_read_fields] + + # Initialize offsets + row_offsets = [-1] * len(all_read_fields) + field_offsets = [-1] * len(all_read_fields) + + for i, bunch in enumerate(fields_files): + first_file = bunch.files()[0] + + # Get field IDs for this bunch + if self._is_blob_file(first_file.file_name): + # For blob files, we need to get the field ID from the write columns + field_ids = [self._get_field_id_from_write_cols(first_file)] + elif first_file.write_cols: + field_ids = self._get_field_ids_from_write_cols(first_file.write_cols) + else: + # For regular files, get all field IDs from the schema + field_ids = [field.id for field in self.table.fields] + + read_fields = [] + for j, read_field_id in enumerate(read_field_index): + for field_id in field_ids: + if read_field_id == field_id: + if row_offsets[j] == -1: + row_offsets[j] = i + field_offsets[j] = len(read_fields) + read_fields.append(all_read_fields[j]) + break + + if not read_fields: + file_record_readers[i] = None + else: + table_fields = self.read_fields + self.read_fields = read_fields # create reader based on read_fields + # Create reader for this bunch + if len(bunch.files()) == 1: + file_record_readers[i] = self._create_file_reader(bunch.files()[0]) + else: + # Create concatenated reader for multiple files + suppliers = [ + lambda f=file: self._create_file_reader(f) for file in bunch.files() + ] + file_record_readers[i] = ConcatRecordReader(suppliers) + self.read_fields = table_fields + + # Validate that all required fields are found + for i, field in enumerate(all_read_fields): + if row_offsets[i] == -1: + if not field.type.is_nullable(): + raise ValueError(f"Field {field} is not null but can't find any file contains it.") + + return DataEvolutionMergeReader(row_offsets, field_offsets, file_record_readers) + + def _create_file_reader(self, file: DataFileMeta) -> RecordReader: + """Create a file reader for a single file.""" + return self.file_reader_supplier(file_path=file.file_path, for_merge_read=False) + + def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> List[FieldBunch]: + """Split files into field bunches.""" + + fields_files = [] + blob_bunch_map = {} + row_count = -1 + + for file in need_merge_files: + if self._is_blob_file(file.file_name): + field_id = self._get_field_id_from_write_cols(file) + if field_id not in blob_bunch_map: + blob_bunch_map[field_id] = BlobBunch(row_count) + blob_bunch_map[field_id].add(file) + else: + # Normal file, just add it to the current merge split + fields_files.append(DataBunch(file)) + row_count = file.row_count + + fields_files.extend(blob_bunch_map.values()) + return fields_files + + def _get_field_id_from_write_cols(self, file: DataFileMeta) -> int: + """Get field ID from write columns for blob files.""" + if not file.write_cols or len(file.write_cols) == 0: + raise ValueError("Blob file must have write columns") + + # Find the field by name in the table schema + field_name = file.write_cols[0] + for field in self.table.fields: + if field.name == field_name: + return field.id + raise ValueError(f"Field {field_name} not found in table schema") + + def _get_field_ids_from_write_cols(self, write_cols: List[str]) -> List[int]: + field_ids = [] + for field_name in write_cols: + for field in self.table.fields: + if field.name == field_name: + field_ids.append(field.id) + return field_ids + + @staticmethod + def _is_blob_file(file_name: str) -> bool: + """Check if a file is a blob file based on its extension.""" + return file_name.endswith('.blob') + + def _get_all_data_fields(self): + return self.table.fields diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index b5f7a7b765..b33fb2c6ad 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -26,7 +26,7 @@ from pypaimon.read.push_down_utils import extract_predicate_to_list from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader from pypaimon.read.split import Split from pypaimon.read.split_read import (MergeFileSplitRead, RawFileSplitRead, - SplitRead) + SplitRead, DataEvolutionSplitRead) from pypaimon.schema.data_types import DataField, PyarrowFieldParser from pypaimon.table.row.offset_row import OffsetRow @@ -132,6 +132,14 @@ class TableRead: read_type=self.read_type, split=split ) + elif self.table.options.get('data-evolution.enabled', 'false').lower() == 'true': + return DataEvolutionSplitRead( + table=self.table, + predicate=self.predicate, + push_down_predicate=self.push_down_predicate, + read_type=self.read_type, + split=split + ) else: return RawFileSplitRead( table=self.table, diff --git a/paimon-python/pypaimon/read/table_scan.py b/paimon-python/pypaimon/read/table_scan.py index 6a6ab9f3f8..d76725ca97 100644 --- a/paimon-python/pypaimon/read/table_scan.py +++ b/paimon-python/pypaimon/read/table_scan.py @@ -68,6 +68,7 @@ class TableScan: self.only_read_real_buckets = True if int( self.table.options.get('bucket', -1)) == BucketMode.POSTPONE_BUCKET.value else False + self.data_evolution = self.table.options.get('data-evolution.enabled', 'false').lower() == 'true' def plan(self) -> Plan: file_entries = self.plan_files() @@ -75,6 +76,8 @@ class TableScan: return Plan([]) if self.table.is_primary_key_table: splits = self._create_primary_key_splits(file_entries) + elif self.data_evolution: + splits = self._create_data_evolution_splits(file_entries) else: splits = self._create_append_only_splits(file_entries) @@ -253,6 +256,48 @@ class TableScan: "row_count": file_entry.file.row_count, }) + def _create_data_evolution_splits(self, file_entries: List[ManifestEntry]) -> List['Split']: + """ + Create data evolution splits for append-only tables with schema evolution. + This method groups files by firstRowId and creates splits that can handle + column merging across different schema versions. + """ + partitioned_files = defaultdict(list) + for entry in file_entries: + partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry) + + if self.idx_of_this_subtask is not None: + partitioned_files, plan_start_row, plan_end_row = self._append_only_filter_by_shard(partitioned_files) + + def weight_func(file_list: List[DataFileMeta]) -> int: + return max(sum(f.file_size for f in file_list), self.open_file_cost) + + splits = [] + for key, file_entries in partitioned_files.items(): + if not file_entries: + continue + + data_files: List[DataFileMeta] = [e.file for e in file_entries] + + # Split files by firstRowId for data evolution + split_by_row_id = self._split_by_row_id(data_files) + + # Pack the split groups for optimal split sizes + packed_files: List[List[List[DataFileMeta]]] = self._pack_for_ordered(split_by_row_id, weight_func, + self.target_split_size) + + # Flatten the packed files and build splits + flatten_packed_files: List[List[DataFileMeta]] = [ + [file for sub_pack in pack for file in sub_pack] + for pack in packed_files + ] + + splits += self._build_split_from_pack(flatten_packed_files, file_entries, False) + + if self.idx_of_this_subtask is not None: + self._compute_split_start_end_row(splits, plan_start_row, plan_end_row) + return splits + def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> List['Split']: partitioned_files = defaultdict(list) for entry in file_entries: @@ -360,3 +405,64 @@ class TableScan: packed.append(bin_items) return packed + + @staticmethod + def _is_blob_file(file_name: str) -> bool: + """Check if a file is a blob file based on its extension.""" + return file_name.endswith('.blob') + + def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]: + """ + Split files by firstRowId for data evolution. + This method groups files that have the same firstRowId, which is essential + for handling schema evolution where files with different schemas need to be + read together to merge columns. + """ + split_by_row_id = [] + + # Sort files by firstRowId and then by maxSequenceNumber + # Files with null firstRowId are treated as having Long.MIN_VALUE + def sort_key(file: DataFileMeta) -> tuple: + first_row_id = file.first_row_id if file.first_row_id is not None else float('-inf') + is_blob = 1 if self._is_blob_file(file.file_name) else 0 + # For files with same firstRowId, sort by maxSequenceNumber in descending order + # (larger sequence number means more recent data) + max_seq = file.max_sequence_number + return (first_row_id, is_blob, -max_seq) + + sorted_files = sorted(files, key=sort_key) + + # Split files by firstRowId + last_row_id = -1 + check_row_id_start = 0 + current_split = [] + + for file in sorted_files: + first_row_id = file.first_row_id + if first_row_id is None: + # Files without firstRowId are treated as individual splits + split_by_row_id.append([file]) + continue + + if not self._is_blob_file(file.file_name) and first_row_id != last_row_id: + if current_split: + split_by_row_id.append(current_split) + + # Validate that files don't overlap + if first_row_id < check_row_id_start: + file_names = [f.file_name for f in sorted_files] + raise ValueError( + f"There are overlapping files in the split: {file_names}, " + f"the wrong file is: {file.file_name}" + ) + + current_split = [] + last_row_id = first_row_id + check_row_id_start = first_row_id + file.row_count + + current_split.append(file) + + if current_split: + split_by_row_id.append(current_split) + + return split_by_row_id diff --git a/paimon-python/pypaimon/snapshot/snapshot.py b/paimon-python/pypaimon/snapshot/snapshot.py index 5bc92dcad4..96b287ab55 100644 --- a/paimon-python/pypaimon/snapshot/snapshot.py +++ b/paimon-python/pypaimon/snapshot/snapshot.py @@ -43,3 +43,4 @@ class Snapshot: changelog_record_count: Optional[int] = json_field("changelogRecordCount", default=None) watermark: Optional[int] = json_field("watermark", default=None) statistics: Optional[str] = json_field("statistics", default=None) + next_row_id: Optional[int] = json_field("nextRowId", default=None) diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py b/paimon-python/pypaimon/tests/data_evolution_test.py new file mode 100644 index 0000000000..90abd2f916 --- /dev/null +++ b/paimon-python/pypaimon/tests/data_evolution_test.py @@ -0,0 +1,483 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import os +import tempfile +import unittest + +import pyarrow as pa +from pypaimon import Schema, CatalogFactory + + +class DataEvolutionTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({ + 'warehouse': cls.warehouse + }) + cls.catalog.create_database('default', False) + + def test_basic(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int8()), + ('f1', pa.int16()), + ]) + schema = Schema.from_pyarrow_schema(simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}) + self.catalog.create_table('default.test_row_tracking', schema, False) + table = self.catalog.get_table('default.test_row_tracking') + + # write 1 + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + expect_data = pa.Table.from_pydict({ + 'f0': [-1, 2], + 'f1': [-1001, 1002] + }, schema=simple_pa_schema) + table_write.write_arrow(expect_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # write 2 + table_write = write_builder.new_write().with_write_type(['f0']) + table_commit = write_builder.new_commit() + data2 = pa.Table.from_pydict({ + 'f0': [3, 4], + }, schema=pa.schema([ + ('f0', pa.int8()), + ])) + table_write.write_arrow(data2) + cmts = table_write.prepare_commit() + cmts[0].new_files[0].first_row_id = 0 + table_commit.commit(cmts) + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_data = table_read.to_arrow(table_scan.plan().splits()) + expect_data = pa.Table.from_pydict({ + 'f0': [3, 4], + 'f1': [-1001, 1002] + }, schema=pa.schema([ + ('f0', pa.int8()), + ('f1', pa.int16()), + ])) + self.assertEqual(actual_data, expect_data) + + def test_multiple_appends(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table('default.test_multiple_appends', schema, False) + table = self.catalog.get_table('default.test_multiple_appends') + + write_builder = table.new_batch_write_builder() + + # write 100 rows: (1, "a", "b") + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + init_data = pa.Table.from_pydict({ + 'f0': [1] * 100, + 'f1': ['a'] * 100, + 'f2': ['b'] * 100, + }, schema=simple_pa_schema) + table_write.write_arrow(init_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + # append:set first_row_id = 100 to modify the row with columns write + write0 = write_builder.new_write().with_write_type(['f0', 'f1']) + write1 = write_builder.new_write().with_write_type(['f2']) + commit = write_builder.new_commit() + data0 = pa.Table.from_pydict({'f0': [2], 'f1': ['x']}, + schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())])) + data1 = pa.Table.from_pydict({'f2': ['y']}, schema=pa.schema([('f2', pa.string())])) + write0.write_arrow(data0) + write1.write_arrow(data1) + cmts = write0.prepare_commit() + write1.prepare_commit() + for c in cmts: + for nf in c.new_files: + nf.first_row_id = 100 + commit.commit(cmts) + write0.close() + write1.close() + commit.close() + + # append:write (3, "c") and ("d"), set first_row_id = 101 + write0 = write_builder.new_write().with_write_type(['f0', 'f1']) + commit0 = write_builder.new_commit() + data0 = pa.Table.from_pydict({'f0': [3], 'f1': ['c']}, + schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())])) + write0.write_arrow(data0) + cmts0 = write0.prepare_commit() + for c in cmts0: + for nf in c.new_files: + nf.first_row_id = 101 + commit0.commit(cmts0) + write0.close() + commit0.close() + + write1 = write_builder.new_write().with_write_type(['f2']) + commit1 = write_builder.new_commit() + data1 = pa.Table.from_pydict({'f2': ['d']}, schema=pa.schema([('f2', pa.string())])) + write1.write_arrow(data1) + cmts1 = write1.prepare_commit() + for c in cmts1: + for nf in c.new_files: + nf.first_row_id = 101 + commit1.commit(cmts1) + write1.close() + commit1.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + + self.assertEqual(actual.num_rows, 102) + expect = pa.Table.from_pydict({ + 'f0': [1] * 100 + [2] + [3], + 'f1': ['a'] * 100 + ['x'] + ['c'], + 'f2': ['b'] * 100 + ['y'] + ['d'], + }, schema=simple_pa_schema) + self.assertEqual(actual, expect) + + def test_disorder_cols_append(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table('default.test_disorder_cols_append', schema, False) + table = self.catalog.get_table('default.test_disorder_cols_append') + + write_builder = table.new_batch_write_builder() + num_rows = 100 + # write 1 rows: (1, "a", "b") + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + init_data = pa.Table.from_pydict({ + 'f0': [1] * num_rows, + 'f1': ['a'] * num_rows, + 'f2': ['b'] * num_rows, + }, schema=simple_pa_schema) + table_write.write_arrow(init_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # append:set first_row_id = 0 to modify the row with columns write + write0 = write_builder.new_write().with_write_type(['f0', 'f2']) + write1 = write_builder.new_write().with_write_type(['f1']) + commit = write_builder.new_commit() + data0 = pa.Table.from_pydict({'f0': [2] * num_rows, 'f2': ['y'] * num_rows}, + schema=pa.schema([('f0', pa.int32()), ('f2', pa.string())])) + data1 = pa.Table.from_pydict({'f1': ['x'] * num_rows}, schema=pa.schema([('f1', pa.string())])) + write0.write_arrow(data0) + write1.write_arrow(data1) + cmts = write0.prepare_commit() + write1.prepare_commit() + for c in cmts: + for nf in c.new_files: + nf.first_row_id = 0 + commit.commit(cmts) + write0.close() + write1.close() + commit.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + + self.assertEqual(actual.num_rows, 100) + expect = pa.Table.from_pydict({ + 'f0': [2] * num_rows, + 'f1': ['x'] * num_rows, + 'f2': ['y'] * num_rows, + }, schema=simple_pa_schema) + self.assertEqual(actual, expect) + + def test_only_some_columns(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table('default.test_only_some_columns', schema, False) + table = self.catalog.get_table('default.test_only_some_columns') + + write_builder = table.new_batch_write_builder() + + # Commit 1: f0 + w0 = write_builder.new_write().with_write_type(['f0']) + c0 = write_builder.new_commit() + d0 = pa.Table.from_pydict({'f0': [1]}, schema=pa.schema([('f0', pa.int32())])) + w0.write_arrow(d0) + c0.commit(w0.prepare_commit()) + w0.close() + c0.close() + + # Commit 2: f1, first_row_id = 0 + w1 = write_builder.new_write().with_write_type(['f1']) + c1 = write_builder.new_commit() + d1 = pa.Table.from_pydict({'f1': ['a']}, schema=pa.schema([('f1', pa.string())])) + w1.write_arrow(d1) + cmts1 = w1.prepare_commit() + for c in cmts1: + for nf in c.new_files: + nf.first_row_id = 0 + c1.commit(cmts1) + w1.close() + c1.close() + + # Commit 3: f2, first_row_id = 0 + w2 = write_builder.new_write().with_write_type(['f2']) + c2 = write_builder.new_commit() + d2 = pa.Table.from_pydict({'f2': ['b']}, schema=pa.schema([('f2', pa.string())])) + w2.write_arrow(d2) + cmts2 = w2.prepare_commit() + for c in cmts2: + for nf in c.new_files: + nf.first_row_id = 0 + c2.commit(cmts2) + w2.close() + c2.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + + expect = pa.Table.from_pydict({ + 'f0': [1], + 'f1': ['a'], + 'f2': ['b'], + }, schema=simple_pa_schema) + self.assertEqual(actual, expect) + + def test_null_values(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table('default.test_null_values', schema, False) + table = self.catalog.get_table('default.test_null_values') + + write_builder = table.new_batch_write_builder() + + # Commit 1: some cols are null + w0 = write_builder.new_write().with_write_type(['f0', 'f1']) + w1 = write_builder.new_write().with_write_type(['f2']) + c = write_builder.new_commit() + + d0 = pa.Table.from_pydict({'f0': [1], 'f1': [None]}, + schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())])) + d1 = pa.Table.from_pydict({'f2': [None]}, schema=pa.schema([('f2', pa.string())])) + w0.write_arrow(d0) + w1.write_arrow(d1) + cmts = w0.prepare_commit() + w1.prepare_commit() + for msg in cmts: + for nf in msg.new_files: + nf.first_row_id = 0 + c.commit(cmts) + w0.close() + w1.close() + c.close() + + # Commit 2 + w1 = write_builder.new_write().with_write_type(['f2']) + c1 = write_builder.new_commit() + d1 = pa.Table.from_pydict({'f2': ['c']}, schema=pa.schema([('f2', pa.string())])) + w1.write_arrow(d1) + cmts1 = w1.prepare_commit() + for msg in cmts1: + for nf in msg.new_files: + nf.first_row_id = 0 + c1.commit(cmts1) + w1.close() + c1.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + expect = pa.Table.from_pydict({ + 'f0': [1], + 'f1': [None], + 'f2': ['c'], + }, schema=simple_pa_schema) + self.assertEqual(actual, expect) + + # different first_row_id append multiple times + def test_multiple_appends_different_first_row_ids(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table('default.test_multiple_appends_diff_rowid', schema, False) + table = self.catalog.get_table('default.test_multiple_appends_diff_rowid') + + write_builder = table.new_batch_write_builder() + + # commit 1 + w0 = write_builder.new_write().with_write_type(['f0', 'f1']) + w1 = write_builder.new_write().with_write_type(['f2']) + c = write_builder.new_commit() + d0 = pa.Table.from_pydict({'f0': [1], 'f1': ['a']}, + schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())])) + d1 = pa.Table.from_pydict({'f2': ['b']}, schema=pa.schema([('f2', pa.string())])) + w0.write_arrow(d0) + w1.write_arrow(d1) + cmts = w0.prepare_commit() + w1.prepare_commit() + for msg in cmts: + for nf in msg.new_files: + nf.first_row_id = 0 + c.commit(cmts) + w0.close() + w1.close() + c.close() + + # commit 2 + w0 = write_builder.new_write().with_write_type(['f0', 'f1']) + c0 = write_builder.new_commit() + d0 = pa.Table.from_pydict({'f0': [2], 'f1': ['c']}, + schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())])) + w0.write_arrow(d0) + cmts0 = w0.prepare_commit() + for msg in cmts0: + for nf in msg.new_files: + nf.first_row_id = 1 + c0.commit(cmts0) + w0.close() + c0.close() + + # commit 3 + w1 = write_builder.new_write().with_write_type(['f2']) + c1 = write_builder.new_commit() + d1 = pa.Table.from_pydict({'f2': ['d']}, schema=pa.schema([('f2', pa.string())])) + w1.write_arrow(d1) + cmts1 = w1.prepare_commit() + for msg in cmts1: + for nf in msg.new_files: + nf.first_row_id = 1 + c1.commit(cmts1) + w1.close() + c1.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + + expect = pa.Table.from_pydict({ + 'f0': [1, 2], + 'f1': ['a', 'c'], + 'f2': ['b', 'd'], + }, schema=simple_pa_schema) + self.assertEqual(actual, expect) + + def test_more_data(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table('default.test_more_data', schema, False) + table = self.catalog.get_table('default.test_more_data') + + write_builder = table.new_batch_write_builder() + + # first commit:100k rows + w0 = write_builder.new_write().with_write_type(['f0', 'f1']) + w1 = write_builder.new_write().with_write_type(['f2']) + c = write_builder.new_commit() + size = 100000 + d0 = pa.Table.from_pydict({ + 'f0': list(range(size)), + 'f1': [f'a{i}' for i in range(size)], + }, schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())])) + d1 = pa.Table.from_pydict({ + 'f2': [f'b{i}' for i in range(size)], + }, schema=pa.schema([('f2', pa.string())])) + w0.write_arrow(d0) + w1.write_arrow(d1) + cmts = w0.prepare_commit() + w1.prepare_commit() + for msg in cmts: + for nf in msg.new_files: + nf.first_row_id = 0 + c.commit(cmts) + w0.close() + w1.close() + c.close() + + # second commit:overwrite f2 to 'c{i}' + w1 = write_builder.new_write().with_write_type(['f2']) + c1 = write_builder.new_commit() + d1 = pa.Table.from_pydict({ + 'f2': [f'c{i}' for i in range(size)], + }, schema=pa.schema([('f2', pa.string())])) + w1.write_arrow(d1) + cmts1 = w1.prepare_commit() + c1.commit(cmts1) + w1.close() + c1.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + + expect = pa.Table.from_pydict({ + 'f0': list(range(size)), + 'f1': [f'a{i}' for i in range(size)], + 'f2': [f'c{i}' for i in range(size)], + }, schema=simple_pa_schema) + self.assertEqual(actual, expect) diff --git a/paimon-python/pypaimon/tests/file_store_commit_test.py b/paimon-python/pypaimon/tests/file_store_commit_test.py index ac7ce95094..ab566c3e52 100644 --- a/paimon-python/pypaimon/tests/file_store_commit_test.py +++ b/paimon-python/pypaimon/tests/file_store_commit_test.py @@ -76,7 +76,10 @@ class TestFileStoreCommit(unittest.TestCase): schema_id=0, level=0, extra_files=None, - creation_time=creation_time + creation_time=creation_time, + external_path=None, + first_row_id=None, + write_cols=None ) commit_message = CommitMessage( @@ -182,7 +185,10 @@ class TestFileStoreCommit(unittest.TestCase): schema_id=0, level=0, extra_files=None, - creation_time=creation_time + creation_time=creation_time, + external_path=None, + first_row_id=None, + write_cols=None ) # File for partition 2 @@ -199,7 +205,10 @@ class TestFileStoreCommit(unittest.TestCase): schema_id=0, level=0, extra_files=None, - creation_time=creation_time + creation_time=creation_time, + external_path=None, + first_row_id=None, + write_cols=None ) commit_message_1 = CommitMessage( @@ -261,7 +270,10 @@ class TestFileStoreCommit(unittest.TestCase): schema_id=0, level=0, extra_files=None, - creation_time=creation_time + creation_time=creation_time, + external_path=None, + first_row_id=None, + write_cols=None ) commit_message = CommitMessage( @@ -389,7 +401,3 @@ class TestFileStoreCommit(unittest.TestCase): file=file )) return commit_entries - - -if __name__ == '__main__': - unittest.main() diff --git a/paimon-python/pypaimon/tests/py36/data_evolution_test.py b/paimon-python/pypaimon/tests/py36/data_evolution_test.py new file mode 100644 index 0000000000..90abd2f916 --- /dev/null +++ b/paimon-python/pypaimon/tests/py36/data_evolution_test.py @@ -0,0 +1,483 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import os +import tempfile +import unittest + +import pyarrow as pa +from pypaimon import Schema, CatalogFactory + + +class DataEvolutionTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({ + 'warehouse': cls.warehouse + }) + cls.catalog.create_database('default', False) + + def test_basic(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int8()), + ('f1', pa.int16()), + ]) + schema = Schema.from_pyarrow_schema(simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}) + self.catalog.create_table('default.test_row_tracking', schema, False) + table = self.catalog.get_table('default.test_row_tracking') + + # write 1 + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + expect_data = pa.Table.from_pydict({ + 'f0': [-1, 2], + 'f1': [-1001, 1002] + }, schema=simple_pa_schema) + table_write.write_arrow(expect_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # write 2 + table_write = write_builder.new_write().with_write_type(['f0']) + table_commit = write_builder.new_commit() + data2 = pa.Table.from_pydict({ + 'f0': [3, 4], + }, schema=pa.schema([ + ('f0', pa.int8()), + ])) + table_write.write_arrow(data2) + cmts = table_write.prepare_commit() + cmts[0].new_files[0].first_row_id = 0 + table_commit.commit(cmts) + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual_data = table_read.to_arrow(table_scan.plan().splits()) + expect_data = pa.Table.from_pydict({ + 'f0': [3, 4], + 'f1': [-1001, 1002] + }, schema=pa.schema([ + ('f0', pa.int8()), + ('f1', pa.int16()), + ])) + self.assertEqual(actual_data, expect_data) + + def test_multiple_appends(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table('default.test_multiple_appends', schema, False) + table = self.catalog.get_table('default.test_multiple_appends') + + write_builder = table.new_batch_write_builder() + + # write 100 rows: (1, "a", "b") + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + init_data = pa.Table.from_pydict({ + 'f0': [1] * 100, + 'f1': ['a'] * 100, + 'f2': ['b'] * 100, + }, schema=simple_pa_schema) + table_write.write_arrow(init_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + # append:set first_row_id = 100 to modify the row with columns write + write0 = write_builder.new_write().with_write_type(['f0', 'f1']) + write1 = write_builder.new_write().with_write_type(['f2']) + commit = write_builder.new_commit() + data0 = pa.Table.from_pydict({'f0': [2], 'f1': ['x']}, + schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())])) + data1 = pa.Table.from_pydict({'f2': ['y']}, schema=pa.schema([('f2', pa.string())])) + write0.write_arrow(data0) + write1.write_arrow(data1) + cmts = write0.prepare_commit() + write1.prepare_commit() + for c in cmts: + for nf in c.new_files: + nf.first_row_id = 100 + commit.commit(cmts) + write0.close() + write1.close() + commit.close() + + # append:write (3, "c") and ("d"), set first_row_id = 101 + write0 = write_builder.new_write().with_write_type(['f0', 'f1']) + commit0 = write_builder.new_commit() + data0 = pa.Table.from_pydict({'f0': [3], 'f1': ['c']}, + schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())])) + write0.write_arrow(data0) + cmts0 = write0.prepare_commit() + for c in cmts0: + for nf in c.new_files: + nf.first_row_id = 101 + commit0.commit(cmts0) + write0.close() + commit0.close() + + write1 = write_builder.new_write().with_write_type(['f2']) + commit1 = write_builder.new_commit() + data1 = pa.Table.from_pydict({'f2': ['d']}, schema=pa.schema([('f2', pa.string())])) + write1.write_arrow(data1) + cmts1 = write1.prepare_commit() + for c in cmts1: + for nf in c.new_files: + nf.first_row_id = 101 + commit1.commit(cmts1) + write1.close() + commit1.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + + self.assertEqual(actual.num_rows, 102) + expect = pa.Table.from_pydict({ + 'f0': [1] * 100 + [2] + [3], + 'f1': ['a'] * 100 + ['x'] + ['c'], + 'f2': ['b'] * 100 + ['y'] + ['d'], + }, schema=simple_pa_schema) + self.assertEqual(actual, expect) + + def test_disorder_cols_append(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table('default.test_disorder_cols_append', schema, False) + table = self.catalog.get_table('default.test_disorder_cols_append') + + write_builder = table.new_batch_write_builder() + num_rows = 100 + # write 1 rows: (1, "a", "b") + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + init_data = pa.Table.from_pydict({ + 'f0': [1] * num_rows, + 'f1': ['a'] * num_rows, + 'f2': ['b'] * num_rows, + }, schema=simple_pa_schema) + table_write.write_arrow(init_data) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + # append:set first_row_id = 0 to modify the row with columns write + write0 = write_builder.new_write().with_write_type(['f0', 'f2']) + write1 = write_builder.new_write().with_write_type(['f1']) + commit = write_builder.new_commit() + data0 = pa.Table.from_pydict({'f0': [2] * num_rows, 'f2': ['y'] * num_rows}, + schema=pa.schema([('f0', pa.int32()), ('f2', pa.string())])) + data1 = pa.Table.from_pydict({'f1': ['x'] * num_rows}, schema=pa.schema([('f1', pa.string())])) + write0.write_arrow(data0) + write1.write_arrow(data1) + cmts = write0.prepare_commit() + write1.prepare_commit() + for c in cmts: + for nf in c.new_files: + nf.first_row_id = 0 + commit.commit(cmts) + write0.close() + write1.close() + commit.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + + self.assertEqual(actual.num_rows, 100) + expect = pa.Table.from_pydict({ + 'f0': [2] * num_rows, + 'f1': ['x'] * num_rows, + 'f2': ['y'] * num_rows, + }, schema=simple_pa_schema) + self.assertEqual(actual, expect) + + def test_only_some_columns(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table('default.test_only_some_columns', schema, False) + table = self.catalog.get_table('default.test_only_some_columns') + + write_builder = table.new_batch_write_builder() + + # Commit 1: f0 + w0 = write_builder.new_write().with_write_type(['f0']) + c0 = write_builder.new_commit() + d0 = pa.Table.from_pydict({'f0': [1]}, schema=pa.schema([('f0', pa.int32())])) + w0.write_arrow(d0) + c0.commit(w0.prepare_commit()) + w0.close() + c0.close() + + # Commit 2: f1, first_row_id = 0 + w1 = write_builder.new_write().with_write_type(['f1']) + c1 = write_builder.new_commit() + d1 = pa.Table.from_pydict({'f1': ['a']}, schema=pa.schema([('f1', pa.string())])) + w1.write_arrow(d1) + cmts1 = w1.prepare_commit() + for c in cmts1: + for nf in c.new_files: + nf.first_row_id = 0 + c1.commit(cmts1) + w1.close() + c1.close() + + # Commit 3: f2, first_row_id = 0 + w2 = write_builder.new_write().with_write_type(['f2']) + c2 = write_builder.new_commit() + d2 = pa.Table.from_pydict({'f2': ['b']}, schema=pa.schema([('f2', pa.string())])) + w2.write_arrow(d2) + cmts2 = w2.prepare_commit() + for c in cmts2: + for nf in c.new_files: + nf.first_row_id = 0 + c2.commit(cmts2) + w2.close() + c2.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + + expect = pa.Table.from_pydict({ + 'f0': [1], + 'f1': ['a'], + 'f2': ['b'], + }, schema=simple_pa_schema) + self.assertEqual(actual, expect) + + def test_null_values(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table('default.test_null_values', schema, False) + table = self.catalog.get_table('default.test_null_values') + + write_builder = table.new_batch_write_builder() + + # Commit 1: some cols are null + w0 = write_builder.new_write().with_write_type(['f0', 'f1']) + w1 = write_builder.new_write().with_write_type(['f2']) + c = write_builder.new_commit() + + d0 = pa.Table.from_pydict({'f0': [1], 'f1': [None]}, + schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())])) + d1 = pa.Table.from_pydict({'f2': [None]}, schema=pa.schema([('f2', pa.string())])) + w0.write_arrow(d0) + w1.write_arrow(d1) + cmts = w0.prepare_commit() + w1.prepare_commit() + for msg in cmts: + for nf in msg.new_files: + nf.first_row_id = 0 + c.commit(cmts) + w0.close() + w1.close() + c.close() + + # Commit 2 + w1 = write_builder.new_write().with_write_type(['f2']) + c1 = write_builder.new_commit() + d1 = pa.Table.from_pydict({'f2': ['c']}, schema=pa.schema([('f2', pa.string())])) + w1.write_arrow(d1) + cmts1 = w1.prepare_commit() + for msg in cmts1: + for nf in msg.new_files: + nf.first_row_id = 0 + c1.commit(cmts1) + w1.close() + c1.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + expect = pa.Table.from_pydict({ + 'f0': [1], + 'f1': [None], + 'f2': ['c'], + }, schema=simple_pa_schema) + self.assertEqual(actual, expect) + + # different first_row_id append multiple times + def test_multiple_appends_different_first_row_ids(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table('default.test_multiple_appends_diff_rowid', schema, False) + table = self.catalog.get_table('default.test_multiple_appends_diff_rowid') + + write_builder = table.new_batch_write_builder() + + # commit 1 + w0 = write_builder.new_write().with_write_type(['f0', 'f1']) + w1 = write_builder.new_write().with_write_type(['f2']) + c = write_builder.new_commit() + d0 = pa.Table.from_pydict({'f0': [1], 'f1': ['a']}, + schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())])) + d1 = pa.Table.from_pydict({'f2': ['b']}, schema=pa.schema([('f2', pa.string())])) + w0.write_arrow(d0) + w1.write_arrow(d1) + cmts = w0.prepare_commit() + w1.prepare_commit() + for msg in cmts: + for nf in msg.new_files: + nf.first_row_id = 0 + c.commit(cmts) + w0.close() + w1.close() + c.close() + + # commit 2 + w0 = write_builder.new_write().with_write_type(['f0', 'f1']) + c0 = write_builder.new_commit() + d0 = pa.Table.from_pydict({'f0': [2], 'f1': ['c']}, + schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())])) + w0.write_arrow(d0) + cmts0 = w0.prepare_commit() + for msg in cmts0: + for nf in msg.new_files: + nf.first_row_id = 1 + c0.commit(cmts0) + w0.close() + c0.close() + + # commit 3 + w1 = write_builder.new_write().with_write_type(['f2']) + c1 = write_builder.new_commit() + d1 = pa.Table.from_pydict({'f2': ['d']}, schema=pa.schema([('f2', pa.string())])) + w1.write_arrow(d1) + cmts1 = w1.prepare_commit() + for msg in cmts1: + for nf in msg.new_files: + nf.first_row_id = 1 + c1.commit(cmts1) + w1.close() + c1.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + + expect = pa.Table.from_pydict({ + 'f0': [1, 2], + 'f1': ['a', 'c'], + 'f2': ['b', 'd'], + }, schema=simple_pa_schema) + self.assertEqual(actual, expect) + + def test_more_data(self): + simple_pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + simple_pa_schema, + options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'} + ) + self.catalog.create_table('default.test_more_data', schema, False) + table = self.catalog.get_table('default.test_more_data') + + write_builder = table.new_batch_write_builder() + + # first commit:100k rows + w0 = write_builder.new_write().with_write_type(['f0', 'f1']) + w1 = write_builder.new_write().with_write_type(['f2']) + c = write_builder.new_commit() + size = 100000 + d0 = pa.Table.from_pydict({ + 'f0': list(range(size)), + 'f1': [f'a{i}' for i in range(size)], + }, schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())])) + d1 = pa.Table.from_pydict({ + 'f2': [f'b{i}' for i in range(size)], + }, schema=pa.schema([('f2', pa.string())])) + w0.write_arrow(d0) + w1.write_arrow(d1) + cmts = w0.prepare_commit() + w1.prepare_commit() + for msg in cmts: + for nf in msg.new_files: + nf.first_row_id = 0 + c.commit(cmts) + w0.close() + w1.close() + c.close() + + # second commit:overwrite f2 to 'c{i}' + w1 = write_builder.new_write().with_write_type(['f2']) + c1 = write_builder.new_commit() + d1 = pa.Table.from_pydict({ + 'f2': [f'c{i}' for i in range(size)], + }, schema=pa.schema([('f2', pa.string())])) + w1.write_arrow(d1) + cmts1 = w1.prepare_commit() + c1.commit(cmts1) + w1.close() + c1.close() + + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + actual = table_read.to_arrow(table_scan.plan().splits()) + + expect = pa.Table.from_pydict({ + 'f0': list(range(size)), + 'f1': [f'a{i}' for i in range(size)], + 'f2': [f'c{i}' for i in range(size)], + }, schema=simple_pa_schema) + self.assertEqual(actual, expect) diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py index 20e6a2c2d8..e6374132af 100644 --- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py +++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py @@ -550,7 +550,7 @@ class RESTReadWritePy36Test(RESTBaseTest): with self.assertRaises(ValueError) as e: table_write.write_arrow_batch(record_batch) - self.assertTrue(str(e.exception).startswith("Input schema isn't consistent with table schema.")) + self.assertTrue(str(e.exception).startswith("Input schema isn't consistent with table schema and write cols.")) def test_write_wide_table_large_data(self): logging.basicConfig(level=logging.INFO) @@ -801,7 +801,9 @@ class RESTReadWritePy36Test(RESTBaseTest): embedded_index=None, file_source=None, value_stats_cols=value_stats_cols, # This is the key field we're testing - external_path=None + external_path=None, + first_row_id=None, + write_cols=None ) # Create ManifestEntry diff --git a/paimon-python/pypaimon/tests/reader_base_test.py b/paimon-python/pypaimon/tests/reader_base_test.py index ccf06d5597..6e9dc1ffc6 100644 --- a/paimon-python/pypaimon/tests/reader_base_test.py +++ b/paimon-python/pypaimon/tests/reader_base_test.py @@ -248,7 +248,7 @@ class ReaderBasicTest(unittest.TestCase): with self.assertRaises(ValueError) as e: table_write.write_arrow_batch(record_batch) - self.assertTrue(str(e.exception).startswith("Input schema isn't consistent with table schema.")) + self.assertTrue(str(e.exception).startswith("Input schema isn't consistent with table schema and write cols.")) def test_reader_iterator(self): read_builder = self.table.new_read_builder() @@ -609,7 +609,9 @@ class ReaderBasicTest(unittest.TestCase): embedded_index=None, file_source=None, value_stats_cols=value_stats_cols, # This is the key field we're testing - external_path=None + external_path=None, + first_row_id=None, + write_cols=None ) # Create ManifestEntry diff --git a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py index dc6c47e778..d05942a256 100644 --- a/paimon-python/pypaimon/tests/rest/rest_read_write_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_read_write_test.py @@ -339,7 +339,7 @@ class RESTTableReadWriteTest(RESTBaseTest): with self.assertRaises(ValueError) as e: table_write.write_arrow_batch(record_batch) - self.assertTrue(str(e.exception).startswith("Input schema isn't consistent with table schema.")) + self.assertTrue(str(e.exception).startswith("Input schema isn't consistent with table schema and write cols.")) def test_reader_iterator(self): read_builder = self.table.new_read_builder() diff --git a/paimon-python/pypaimon/write/batch_table_write.py b/paimon-python/pypaimon/write/batch_table_write.py index c2e533cef5..a71e9c0503 100644 --- a/paimon-python/pypaimon/write/batch_table_write.py +++ b/paimon-python/pypaimon/write/batch_table_write.py @@ -67,10 +67,21 @@ class BatchTableWrite: self.batch_committed = True return self.file_store_write.prepare_commit() + def with_write_type(self, write_cols: List[str]): + for col in write_cols: + if col not in self.table_pyarrow_schema.names: + raise ValueError(f"Column {col} is not in table schema.") + if len(write_cols) == len(self.table_pyarrow_schema.names): + write_cols = None + self.file_store_write.write_cols = write_cols + return self + def close(self): self.file_store_write.close() - def _validate_pyarrow_schema(self, data_schema): - if data_schema != self.table_pyarrow_schema: - raise ValueError(f"Input schema isn't consistent with table schema. " - f"Table schema is: {data_schema} Input schema is: {self.table_pyarrow_schema}") + def _validate_pyarrow_schema(self, data_schema: pa.Schema): + if data_schema != self.table_pyarrow_schema and data_schema.names != self.file_store_write.write_cols: + raise ValueError(f"Input schema isn't consistent with table schema and write cols. " + f"Input schema is: {data_schema} " + f"Table schema is: {self.table_pyarrow_schema} " + f"Write cols is: {self.file_store_write.write_cols}") diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 5920f50ad8..10d2796b76 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -130,6 +130,24 @@ class FileStoreCommit: added_file_count = 0 deleted_file_count = 0 delta_record_count = 0 + # process snapshot + new_snapshot_id = self._generate_snapshot_id() + + # Check if row tracking is enabled + row_tracking_enabled = self.table.options.get('row-tracking.enabled', 'false').lower() == 'true' + + # Apply row tracking logic if enabled + next_row_id = None + if row_tracking_enabled: + # Assign snapshot ID to delta files + commit_entries = self._assign_snapshot_id(new_snapshot_id, commit_entries) + + # Get the next row ID start from the latest snapshot + first_row_id_start = self._get_next_row_id_start() + + # Assign row IDs to new files and get the next row ID for the snapshot + commit_entries, next_row_id = self._assign_row_tracking_meta(first_row_id_start, commit_entries) + for entry in commit_entries: if entry.kind == 0: added_file_count += 1 @@ -194,6 +212,7 @@ class FileStoreCommit: commit_identifier=commit_identifier, commit_kind=commit_kind, time_millis=int(time.time() * 1000), + next_row_id=next_row_id, ) # Generate partition statistics for the commit @@ -314,3 +333,59 @@ class FileStoreCommit: ) for stats in partition_stats.values() ] + + def _assign_snapshot_id(self, snapshot_id: int, commit_entries: List[ManifestEntry]) -> List[ManifestEntry]: + """Assign snapshot ID to all commit entries.""" + return [entry.assign_sequence_number(snapshot_id, snapshot_id) for entry in commit_entries] + + def _get_next_row_id_start(self) -> int: + """Get the next row ID start from the latest snapshot.""" + latest_snapshot = self.snapshot_manager.get_latest_snapshot() + if latest_snapshot and hasattr(latest_snapshot, 'next_row_id') and latest_snapshot.next_row_id is not None: + return latest_snapshot.next_row_id + return 0 + + def _assign_row_tracking_meta(self, first_row_id_start: int, commit_entries: List[ManifestEntry]): + """ + Assign row tracking metadata (first_row_id) to new files. + This follows the Java implementation logic from FileStoreCommitImpl.assignRowTrackingMeta. + """ + if not commit_entries: + return commit_entries, first_row_id_start + + row_id_assigned = [] + start = first_row_id_start + blob_start = first_row_id_start + + for entry in commit_entries: + # Check if this is an append file that needs row ID assignment + if (entry.kind == 0 and # ADD kind + entry.file.file_source == "APPEND" and # APPEND file source + entry.file.first_row_id is None): # No existing first_row_id + + if self._is_blob_file(entry.file.file_name): + # Handle blob files specially + if blob_start >= start: + raise RuntimeError( + f"This is a bug, blobStart {blob_start} should be less than start {start} " + f"when assigning a blob entry file." + ) + row_count = entry.file.row_count + row_id_assigned.append(entry.assign_first_row_id(blob_start)) + blob_start += row_count + else: + # Handle regular files + row_count = entry.file.row_count + row_id_assigned.append(entry.assign_first_row_id(start)) + blob_start = start + start += row_count + else: + # For compact files or files that already have first_row_id, don't assign + row_id_assigned.append(entry) + + return row_id_assigned, start + + @staticmethod + def _is_blob_file(file_name: str) -> bool: + """Check if a file is a blob file based on its extension.""" + return file_name.endswith('.blob') diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index bcef10a4c3..841fef3a65 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -34,6 +34,7 @@ class FileStoreWrite: self.table: FileStoreTable = table self.data_writers: Dict[Tuple, DataWriter] = {} self.max_seq_numbers = self._seq_number_stats() # TODO: build this on-demand instead of on all + self.write_cols = None def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch): key = (partition, bucket) @@ -56,6 +57,7 @@ class FileStoreWrite: partition=partition, bucket=bucket, max_seq_number=self.max_seq_numbers.get((partition, bucket), 1), + write_cols=self.write_cols ) def prepare_commit(self) -> List[CommitMessage]: diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index cc0fc944a1..ad6e327c89 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -26,6 +26,7 @@ from typing import Dict, List, Optional, Tuple from pypaimon.common.core_options import CoreOptions from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.simple_stats import SimpleStats +from pypaimon.schema.data_types import PyarrowFieldParser from pypaimon.table.bucket_mode import BucketMode from pypaimon.table.row.generic_row import GenericRow @@ -33,7 +34,8 @@ from pypaimon.table.row.generic_row import GenericRow class DataWriter(ABC): """Base class for data writers that handle PyArrow tables directly.""" - def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int): + def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, + write_cols: Optional[List[str]] = None): from pypaimon.table.file_store_table import FileStoreTable self.table: FileStoreTable = table @@ -55,6 +57,7 @@ class DataWriter(ABC): self.pending_data: Optional[pa.Table] = None self.committed_files: List[DataFileMeta] = [] + self.write_cols = write_cols def write(self, data: pa.RecordBatch): processed_data = self._process_data(data) @@ -126,11 +129,13 @@ class DataWriter(ABC): max_key = [col.to_pylist()[0] for col in max_key_row_batch.columns] # key stats & value stats + data_fields = self.table.fields if self.table.is_primary_key_table \ + else PyarrowFieldParser.to_paimon_schema(data.schema) column_stats = { field.name: self._get_column_stats(data, field.name) - for field in self.table.table_schema.fields + for field in data_fields } - all_fields = self.table.table_schema.fields + all_fields = data_fields min_value_stats = [column_stats[field.name]['min_values'] for field in all_fields] max_value_stats = [column_stats[field.name]['max_values'] for field in all_fields] value_null_counts = [column_stats[field.name]['null_counts'] for field in all_fields] @@ -156,8 +161,8 @@ class DataWriter(ABC): key_null_counts, ), value_stats=SimpleStats( - GenericRow(min_value_stats, self.table.table_schema.fields), - GenericRow(max_value_stats, self.table.table_schema.fields), + GenericRow(min_value_stats, data_fields), + GenericRow(max_value_stats, data_fields), value_null_counts, ), min_sequence_number=min_seq, @@ -167,7 +172,12 @@ class DataWriter(ABC): extra_files=[], creation_time=datetime.now(), delete_row_count=0, - value_stats_cols=None, # None means all columns have statistics + file_source="APPEND", + value_stats_cols=None, # None means all columns in the data have statistics + external_path=None, + first_row_id=None, + write_cols=self.write_cols, + # None means all columns in the table have been written file_path=str(file_path), ))
