This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 139f65cc8f10d478b8f0545382146f7e1cbc1b8c Author: ChengHui Chen <[email protected]> AuthorDate: Mon Oct 20 21:09:10 2025 +0800 [Python] clean code for pypaimon (#6433) --- paimon-python/pypaimon/common/predicate.py | 231 +++++++-------------- paimon-python/pypaimon/read/split_read.py | 21 +- paimon-python/pypaimon/read/table_read.py | 18 +- .../pypaimon/write/writer/data_blob_writer.py | 2 +- paimon-python/pypaimon/write/writer/data_writer.py | 2 +- 5 files changed, 95 insertions(+), 179 deletions(-) diff --git a/paimon-python/pypaimon/common/predicate.py b/paimon-python/pypaimon/common/predicate.py index 4ca8644f6e..a245bb8e1e 100644 --- a/paimon-python/pypaimon/common/predicate.py +++ b/paimon-python/pypaimon/common/predicate.py @@ -50,95 +50,33 @@ class Predicate: literals=literals) def test(self, record: InternalRow) -> bool: - if self.method == 'equal': - return record.get_field(self.index) == self.literals[0] - elif self.method == 'notEqual': - return record.get_field(self.index) != self.literals[0] - elif self.method == 'lessThan': - return record.get_field(self.index) < self.literals[0] - elif self.method == 'lessOrEqual': - return record.get_field(self.index) <= self.literals[0] - elif self.method == 'greaterThan': - return record.get_field(self.index) > self.literals[0] - elif self.method == 'greaterOrEqual': - return record.get_field(self.index) >= self.literals[0] - elif self.method == 'isNull': - return record.get_field(self.index) is None - elif self.method == 'isNotNull': - return record.get_field(self.index) is not None - elif self.method == 'startsWith': - field_value = record.get_field(self.index) - if not isinstance(field_value, str): - return False - return field_value.startswith(self.literals[0]) - elif self.method == 'endsWith': - field_value = record.get_field(self.index) - if not isinstance(field_value, str): - return False - return field_value.endswith(self.literals[0]) - elif self.method == 'contains': - field_value = record.get_field(self.index) - if not isinstance(field_value, str): - return False - return self.literals[0] in field_value - elif self.method == 'in': - return record.get_field(self.index) in self.literals - elif self.method == 'notIn': - return record.get_field(self.index) not in self.literals - elif self.method == 'between': - field_value = record.get_field(self.index) - return self.literals[0] <= field_value <= self.literals[1] - elif self.method == 'and': - return all(p.test(record) for p in self.literals) - elif self.method == 'or': - t = any(p.test(record) for p in self.literals) - return t - else: - raise ValueError("Unsupported predicate method: {}".format(self.method)) - - def test_by_value(self, value: Any) -> bool: if self.method == 'and': - return all(p.test_by_value(value) for p in self.literals) + return all(p.test(record) for p in self.literals) if self.method == 'or': - t = any(p.test_by_value(value) for p in self.literals) + t = any(p.test(record) for p in self.literals) return t - if self.method == 'equal': - return value == self.literals[0] - if self.method == 'notEqual': - return value != self.literals[0] - if self.method == 'lessThan': - return value < self.literals[0] - if self.method == 'lessOrEqual': - return value <= self.literals[0] - if self.method == 'greaterThan': - return value > self.literals[0] - if self.method == 'greaterOrEqual': - return value >= self.literals[0] - if self.method == 'isNull': - return value is None - if self.method == 'isNotNull': - return value is not None - if self.method == 'startsWith': - if not isinstance(value, str): - return False - return value.startswith(self.literals[0]) - if self.method == 'endsWith': - if not isinstance(value, str): - return False - return value.endswith(self.literals[0]) - if self.method == 'contains': - if not isinstance(value, str): - return False - return self.literals[0] in value - if self.method == 'in': - return value in self.literals - if self.method == 'notIn': - return value not in self.literals - if self.method == 'between': - return self.literals[0] <= value <= self.literals[1] - - raise ValueError("Unsupported predicate method: {}".format(self.method)) + dispatch = { + 'equal': lambda val, literals: val == literals[0], + 'notEqual': lambda val, literals: val != literals[0], + 'lessThan': lambda val, literals: val < literals[0], + 'lessOrEqual': lambda val, literals: val <= literals[0], + 'greaterThan': lambda val, literals: val > literals[0], + 'greaterOrEqual': lambda val, literals: val >= literals[0], + 'isNull': lambda val, literals: val is None, + 'isNotNull': lambda val, literals: val is not None, + 'startsWith': lambda val, literals: isinstance(val, str) and val.startswith(literals[0]), + 'endsWith': lambda val, literals: isinstance(val, str) and val.endswith(literals[0]), + 'contains': lambda val, literals: isinstance(val, str) and literals[0] in val, + 'in': lambda val, literals: val in literals, + 'notIn': lambda val, literals: val not in literals, + 'between': lambda val, literals: literals[0] <= val <= literals[1], + } + func = dispatch.get(self.method) + if func: + field_value = record.get_field(self.index) + return func(field_value, self.literals) + raise ValueError(f"Unsupported predicate method: {self.method}") def test_by_simple_stats(self, stat: SimpleStats, row_count: int) -> bool: return self.test_by_stats({ @@ -169,66 +107,39 @@ class Predicate: max_value = stat["max_values"][self.field] if min_value is None or max_value is None or (null_count is not None and null_count == row_count): - return False - - if self.method == 'equal': - return min_value <= self.literals[0] <= max_value - if self.method == 'notEqual': - return not (min_value == self.literals[0] == max_value) - if self.method == 'lessThan': - return self.literals[0] > min_value - if self.method == 'lessOrEqual': - return self.literals[0] >= min_value - if self.method == 'greaterThan': - return self.literals[0] < max_value - if self.method == 'greaterOrEqual': - return self.literals[0] <= max_value - if self.method == 'startsWith': - if not isinstance(min_value, str) or not isinstance(max_value, str): - raise RuntimeError("startsWith predicate on non-str field") - return ((min_value.startswith(self.literals[0]) or min_value < self.literals[0]) - and (max_value.startswith(self.literals[0]) or max_value > self.literals[0])) - if self.method == 'endsWith': + # invalid stats, skip validation return True - if self.method == 'contains': - return True - if self.method == 'in': - for literal in self.literals: - if min_value <= literal <= max_value: - return True - return False - if self.method == 'notIn': - for literal in self.literals: - if min_value == literal == max_value: - return False - return True - if self.method == 'between': - return self.literals[0] <= max_value and self.literals[1] >= min_value - else: - raise ValueError("Unsupported predicate method: {}".format(self.method)) + + dispatch = { + 'equal': lambda literals: min_value <= literals[0] <= max_value, + 'notEqual': lambda literals: not (min_value == literals[0] == max_value), + 'lessThan': lambda literals: literals[0] > min_value, + 'lessOrEqual': lambda literals: literals[0] >= min_value, + 'greaterThan': lambda literals: literals[0] < max_value, + 'greaterOrEqual': lambda literals: literals[0] <= max_value, + 'in': lambda literals: any(min_value <= l <= max_value for l in literals), + 'notIn': lambda literals: not any(min_value == l == max_value for l in literals), + 'between': lambda literals: literals[0] <= max_value and literals[1] >= min_value, + 'startsWith': lambda literals: ((isinstance(min_value, str) and isinstance(max_value, str)) and + ((min_value.startswith(literals[0]) or min_value < literals[0]) and + (max_value.startswith(literals[0]) or max_value > literals[0]))), + 'endsWith': lambda literals: True, + 'contains': lambda literals: True, + } + func = dispatch.get(self.method) + if func: + return func(self.literals) + raise ValueError(f"Unsupported predicate method: {self.method}") def to_arrow(self) -> Any: - if self.method == 'equal': - return pyarrow_dataset.field(self.field) == self.literals[0] - elif self.method == 'notEqual': - return pyarrow_dataset.field(self.field) != self.literals[0] - elif self.method == 'lessThan': - return pyarrow_dataset.field(self.field) < self.literals[0] - elif self.method == 'lessOrEqual': - return pyarrow_dataset.field(self.field) <= self.literals[0] - elif self.method == 'greaterThan': - return pyarrow_dataset.field(self.field) > self.literals[0] - elif self.method == 'greaterOrEqual': - return pyarrow_dataset.field(self.field) >= self.literals[0] - elif self.method == 'isNull': - return pyarrow_dataset.field(self.field).is_null() - elif self.method == 'isNotNull': - return pyarrow_dataset.field(self.field).is_valid() - elif self.method == 'in': - return pyarrow_dataset.field(self.field).isin(self.literals) - elif self.method == 'notIn': - return ~pyarrow_dataset.field(self.field).isin(self.literals) - elif self.method == 'startsWith': + if self.method == 'and': + return reduce(lambda x, y: x & y, + [p.to_arrow() for p in self.literals]) + if self.method == 'or': + return reduce(lambda x, y: x | y, + [p.to_arrow() for p in self.literals]) + + if self.method == 'startsWith': pattern = self.literals[0] # For PyArrow compatibility - improved approach try: @@ -240,7 +151,7 @@ class Predicate: except Exception: # Fallback to True return pyarrow_dataset.field(self.field).is_valid() | pyarrow_dataset.field(self.field).is_null() - elif self.method == 'endsWith': + if self.method == 'endsWith': pattern = self.literals[0] # For PyArrow compatibility try: @@ -252,7 +163,7 @@ class Predicate: except Exception: # Fallback to True return pyarrow_dataset.field(self.field).is_valid() | pyarrow_dataset.field(self.field).is_null() - elif self.method == 'contains': + if self.method == 'contains': pattern = self.literals[0] # For PyArrow compatibility try: @@ -264,14 +175,24 @@ class Predicate: except Exception: # Fallback to True return pyarrow_dataset.field(self.field).is_valid() | pyarrow_dataset.field(self.field).is_null() - elif self.method == 'between': - return (pyarrow_dataset.field(self.field) >= self.literals[0]) & \ - (pyarrow_dataset.field(self.field) <= self.literals[1]) - elif self.method == 'and': - return reduce(lambda x, y: x & y, - [p.to_arrow() for p in self.literals]) - elif self.method == 'or': - return reduce(lambda x, y: x | y, - [p.to_arrow() for p in self.literals]) - else: - raise ValueError("Unsupported predicate method: {}".format(self.method)) + + field = pyarrow_dataset.field(self.field) + dispatch = { + 'equal': lambda literals: field == literals[0], + 'notEqual': lambda literals: field != literals[0], + 'lessThan': lambda literals: field < literals[0], + 'lessOrEqual': lambda literals: field <= literals[0], + 'greaterThan': lambda literals: field > literals[0], + 'greaterOrEqual': lambda literals: field >= literals[0], + 'isNull': lambda literals: field.is_null(), + 'isNotNull': lambda literals: field.is_valid(), + 'in': lambda literals: field.isin(literals), + 'notIn': lambda literals: ~field.isin(literals), + 'between': lambda literals: (field >= literals[0]) & (field <= literals[1]), + } + + func = dispatch.get(self.method) + if func: + return func(self.literals) + + raise ValueError("Unsupported predicate method: {}".format(self.method)) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 5a8bc4825b..6b93b51d15 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -19,13 +19,14 @@ import os from abc import ABC, abstractmethod from functools import partial -from typing import List, Optional, Tuple +from typing import List, Optional, Tuple, Any from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.read.interval_partition import IntervalPartition, SortedRun from pypaimon.read.partition_info import PartitionInfo +from pypaimon.read.push_down_utils import trim_predicate_by_fields from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader, ShardBatchReader, MergeAllBatchReader from pypaimon.read.reader.concat_record_reader import ConcatRecordReader from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader @@ -54,21 +55,31 @@ NULL_FIELD_INDEX = -1 class SplitRead(ABC): """Abstract base class for split reading operations.""" - def __init__(self, table, predicate: Optional[Predicate], push_down_predicate, - read_type: List[DataField], split: Split): + def __init__(self, table, predicate: Optional[Predicate], read_type: List[DataField], split: Split): from pypaimon.table.file_store_table import FileStoreTable self.table: FileStoreTable = table self.predicate = predicate - self.push_down_predicate = push_down_predicate + self.push_down_predicate = self._push_down_predicate() self.split = split self.value_arity = len(read_type) - self.trimmed_primary_key = [field.name for field in self.table.table_schema.get_trimmed_primary_key_fields()] + self.trimmed_primary_key = self.table.table_schema.get_trimmed_primary_keys() self.read_fields = read_type if isinstance(self, MergeFileSplitRead): self.read_fields = self._create_key_value_fields(read_type) + def _push_down_predicate(self) -> Any: + if self.predicate is None: + return None + elif self.table.is_primary_key_table: + pk_predicate = trim_predicate_by_fields(self.predicate, self.table.primary_keys) + if not pk_predicate: + return None + return pk_predicate.to_arrow() + else: + return self.predicate.to_arrow() + @abstractmethod def create_reader(self) -> RecordReader: """Create a record reader for the given split.""" diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 061303862c..31545e4ea4 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -15,14 +15,13 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -from typing import Any, Iterator, List, Optional +from typing import Iterator, List, Optional import pandas import pyarrow from pypaimon.common.core_options import CoreOptions from pypaimon.common.predicate import Predicate -from pypaimon.read.push_down_utils import trim_predicate_by_fields from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader from pypaimon.read.split import Split from pypaimon.read.split_read import (MergeFileSplitRead, RawFileSplitRead, @@ -39,7 +38,6 @@ class TableRead: self.table: FileStoreTable = table self.predicate = predicate - self.push_down_predicate = self._push_down_predicate() self.read_type = read_type def to_iterator(self, splits: List[Split]) -> Iterator: @@ -108,23 +106,11 @@ class TableRead: return ray.data.from_arrow(self.to_arrow(splits)) - def _push_down_predicate(self) -> Any: - if self.predicate is None: - return None - elif self.table.is_primary_key_table: - pk_predicate = trim_predicate_by_fields(self.predicate, self.table.primary_keys) - if not pk_predicate: - return None - return pk_predicate.to_arrow() - else: - return self.predicate.to_arrow() - def _create_split_read(self, split: Split) -> SplitRead: if self.table.is_primary_key_table and not split.raw_convertible: return MergeFileSplitRead( table=self.table, predicate=self.predicate, - push_down_predicate=self.push_down_predicate, read_type=self.read_type, split=split ) @@ -132,7 +118,6 @@ class TableRead: return DataEvolutionSplitRead( table=self.table, predicate=self.predicate, - push_down_predicate=self.push_down_predicate, read_type=self.read_type, split=split ) @@ -140,7 +125,6 @@ class TableRead: return RawFileSplitRead( table=self.table, predicate=self.predicate, - push_down_predicate=self.push_down_predicate, read_type=self.read_type, split=split ) diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py b/paimon-python/pypaimon/write/writer/data_blob_writer.py index e34b9a5701..b711d2e695 100644 --- a/paimon-python/pypaimon/write/writer/data_blob_writer.py +++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py @@ -83,7 +83,7 @@ class DataBlobWriter(DataWriter): self.blob_column_name = self._get_blob_columns_from_schema() # Split schema into normal and blob columns - all_column_names = [field.name for field in self.table.table_schema.fields] + all_column_names = self.table.field_names self.normal_column_names = [col for col in all_column_names if col != self.blob_column_name] self.write_cols = self.normal_column_names diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 6ee92082db..3515991933 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -44,7 +44,7 @@ class DataWriter(ABC): self.file_io = self.table.file_io self.trimmed_primary_key_fields = self.table.table_schema.get_trimmed_primary_key_fields() - self.trimmed_primary_key = [field.name for field in self.trimmed_primary_key_fields] + self.trimmed_primary_key = self.table.table_schema.get_trimmed_primary_keys() options = self.table.options self.target_file_size = 256 * 1024 * 1024
