This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fa3c6c080a [Python] clean code for pypaimon (#6433)
fa3c6c080a is described below
commit fa3c6c080aeed47b12d56b066bb75586aeb45359
Author: ChengHui Chen <[email protected]>
AuthorDate: Mon Oct 20 21:09:10 2025 +0800
[Python] clean code for pypaimon (#6433)
---
paimon-python/pypaimon/common/predicate.py | 231 +++++++--------------
paimon-python/pypaimon/read/split_read.py | 21 +-
paimon-python/pypaimon/read/table_read.py | 18 +-
.../pypaimon/write/writer/data_blob_writer.py | 2 +-
paimon-python/pypaimon/write/writer/data_writer.py | 2 +-
5 files changed, 95 insertions(+), 179 deletions(-)
diff --git a/paimon-python/pypaimon/common/predicate.py
b/paimon-python/pypaimon/common/predicate.py
index 4ca8644f6e..a245bb8e1e 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -50,95 +50,33 @@ class Predicate:
literals=literals)
def test(self, record: InternalRow) -> bool:
- if self.method == 'equal':
- return record.get_field(self.index) == self.literals[0]
- elif self.method == 'notEqual':
- return record.get_field(self.index) != self.literals[0]
- elif self.method == 'lessThan':
- return record.get_field(self.index) < self.literals[0]
- elif self.method == 'lessOrEqual':
- return record.get_field(self.index) <= self.literals[0]
- elif self.method == 'greaterThan':
- return record.get_field(self.index) > self.literals[0]
- elif self.method == 'greaterOrEqual':
- return record.get_field(self.index) >= self.literals[0]
- elif self.method == 'isNull':
- return record.get_field(self.index) is None
- elif self.method == 'isNotNull':
- return record.get_field(self.index) is not None
- elif self.method == 'startsWith':
- field_value = record.get_field(self.index)
- if not isinstance(field_value, str):
- return False
- return field_value.startswith(self.literals[0])
- elif self.method == 'endsWith':
- field_value = record.get_field(self.index)
- if not isinstance(field_value, str):
- return False
- return field_value.endswith(self.literals[0])
- elif self.method == 'contains':
- field_value = record.get_field(self.index)
- if not isinstance(field_value, str):
- return False
- return self.literals[0] in field_value
- elif self.method == 'in':
- return record.get_field(self.index) in self.literals
- elif self.method == 'notIn':
- return record.get_field(self.index) not in self.literals
- elif self.method == 'between':
- field_value = record.get_field(self.index)
- return self.literals[0] <= field_value <= self.literals[1]
- elif self.method == 'and':
- return all(p.test(record) for p in self.literals)
- elif self.method == 'or':
- t = any(p.test(record) for p in self.literals)
- return t
- else:
- raise ValueError("Unsupported predicate method:
{}".format(self.method))
-
- def test_by_value(self, value: Any) -> bool:
if self.method == 'and':
- return all(p.test_by_value(value) for p in self.literals)
+ return all(p.test(record) for p in self.literals)
if self.method == 'or':
- t = any(p.test_by_value(value) for p in self.literals)
+ t = any(p.test(record) for p in self.literals)
return t
- if self.method == 'equal':
- return value == self.literals[0]
- if self.method == 'notEqual':
- return value != self.literals[0]
- if self.method == 'lessThan':
- return value < self.literals[0]
- if self.method == 'lessOrEqual':
- return value <= self.literals[0]
- if self.method == 'greaterThan':
- return value > self.literals[0]
- if self.method == 'greaterOrEqual':
- return value >= self.literals[0]
- if self.method == 'isNull':
- return value is None
- if self.method == 'isNotNull':
- return value is not None
- if self.method == 'startsWith':
- if not isinstance(value, str):
- return False
- return value.startswith(self.literals[0])
- if self.method == 'endsWith':
- if not isinstance(value, str):
- return False
- return value.endswith(self.literals[0])
- if self.method == 'contains':
- if not isinstance(value, str):
- return False
- return self.literals[0] in value
- if self.method == 'in':
- return value in self.literals
- if self.method == 'notIn':
- return value not in self.literals
- if self.method == 'between':
- return self.literals[0] <= value <= self.literals[1]
-
- raise ValueError("Unsupported predicate method:
{}".format(self.method))
+ dispatch = {
+ 'equal': lambda val, literals: val == literals[0],
+ 'notEqual': lambda val, literals: val != literals[0],
+ 'lessThan': lambda val, literals: val < literals[0],
+ 'lessOrEqual': lambda val, literals: val <= literals[0],
+ 'greaterThan': lambda val, literals: val > literals[0],
+ 'greaterOrEqual': lambda val, literals: val >= literals[0],
+ 'isNull': lambda val, literals: val is None,
+ 'isNotNull': lambda val, literals: val is not None,
+ 'startsWith': lambda val, literals: isinstance(val, str) and
val.startswith(literals[0]),
+ 'endsWith': lambda val, literals: isinstance(val, str) and
val.endswith(literals[0]),
+ 'contains': lambda val, literals: isinstance(val, str) and
literals[0] in val,
+ 'in': lambda val, literals: val in literals,
+ 'notIn': lambda val, literals: val not in literals,
+ 'between': lambda val, literals: literals[0] <= val <= literals[1],
+ }
+ func = dispatch.get(self.method)
+ if func:
+ field_value = record.get_field(self.index)
+ return func(field_value, self.literals)
+ raise ValueError(f"Unsupported predicate method: {self.method}")
def test_by_simple_stats(self, stat: SimpleStats, row_count: int) -> bool:
return self.test_by_stats({
@@ -169,66 +107,39 @@ class Predicate:
max_value = stat["max_values"][self.field]
if min_value is None or max_value is None or (null_count is not None
and null_count == row_count):
- return False
-
- if self.method == 'equal':
- return min_value <= self.literals[0] <= max_value
- if self.method == 'notEqual':
- return not (min_value == self.literals[0] == max_value)
- if self.method == 'lessThan':
- return self.literals[0] > min_value
- if self.method == 'lessOrEqual':
- return self.literals[0] >= min_value
- if self.method == 'greaterThan':
- return self.literals[0] < max_value
- if self.method == 'greaterOrEqual':
- return self.literals[0] <= max_value
- if self.method == 'startsWith':
- if not isinstance(min_value, str) or not isinstance(max_value,
str):
- raise RuntimeError("startsWith predicate on non-str field")
- return ((min_value.startswith(self.literals[0]) or min_value <
self.literals[0])
- and (max_value.startswith(self.literals[0]) or max_value >
self.literals[0]))
- if self.method == 'endsWith':
+ # invalid stats, skip validation
return True
- if self.method == 'contains':
- return True
- if self.method == 'in':
- for literal in self.literals:
- if min_value <= literal <= max_value:
- return True
- return False
- if self.method == 'notIn':
- for literal in self.literals:
- if min_value == literal == max_value:
- return False
- return True
- if self.method == 'between':
- return self.literals[0] <= max_value and self.literals[1] >=
min_value
- else:
- raise ValueError("Unsupported predicate method:
{}".format(self.method))
+
+ dispatch = {
+ 'equal': lambda literals: min_value <= literals[0] <= max_value,
+ 'notEqual': lambda literals: not (min_value == literals[0] ==
max_value),
+ 'lessThan': lambda literals: literals[0] > min_value,
+ 'lessOrEqual': lambda literals: literals[0] >= min_value,
+ 'greaterThan': lambda literals: literals[0] < max_value,
+ 'greaterOrEqual': lambda literals: literals[0] <= max_value,
+ 'in': lambda literals: any(min_value <= l <= max_value for l in
literals),
+ 'notIn': lambda literals: not any(min_value == l == max_value for
l in literals),
+ 'between': lambda literals: literals[0] <= max_value and
literals[1] >= min_value,
+ 'startsWith': lambda literals: ((isinstance(min_value, str) and
isinstance(max_value, str)) and
+
((min_value.startswith(literals[0]) or min_value < literals[0]) and
+
(max_value.startswith(literals[0]) or max_value > literals[0]))),
+ 'endsWith': lambda literals: True,
+ 'contains': lambda literals: True,
+ }
+ func = dispatch.get(self.method)
+ if func:
+ return func(self.literals)
+ raise ValueError(f"Unsupported predicate method: {self.method}")
def to_arrow(self) -> Any:
- if self.method == 'equal':
- return pyarrow_dataset.field(self.field) == self.literals[0]
- elif self.method == 'notEqual':
- return pyarrow_dataset.field(self.field) != self.literals[0]
- elif self.method == 'lessThan':
- return pyarrow_dataset.field(self.field) < self.literals[0]
- elif self.method == 'lessOrEqual':
- return pyarrow_dataset.field(self.field) <= self.literals[0]
- elif self.method == 'greaterThan':
- return pyarrow_dataset.field(self.field) > self.literals[0]
- elif self.method == 'greaterOrEqual':
- return pyarrow_dataset.field(self.field) >= self.literals[0]
- elif self.method == 'isNull':
- return pyarrow_dataset.field(self.field).is_null()
- elif self.method == 'isNotNull':
- return pyarrow_dataset.field(self.field).is_valid()
- elif self.method == 'in':
- return pyarrow_dataset.field(self.field).isin(self.literals)
- elif self.method == 'notIn':
- return ~pyarrow_dataset.field(self.field).isin(self.literals)
- elif self.method == 'startsWith':
+ if self.method == 'and':
+ return reduce(lambda x, y: x & y,
+ [p.to_arrow() for p in self.literals])
+ if self.method == 'or':
+ return reduce(lambda x, y: x | y,
+ [p.to_arrow() for p in self.literals])
+
+ if self.method == 'startsWith':
pattern = self.literals[0]
# For PyArrow compatibility - improved approach
try:
@@ -240,7 +151,7 @@ class Predicate:
except Exception:
# Fallback to True
return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
- elif self.method == 'endsWith':
+ if self.method == 'endsWith':
pattern = self.literals[0]
# For PyArrow compatibility
try:
@@ -252,7 +163,7 @@ class Predicate:
except Exception:
# Fallback to True
return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
- elif self.method == 'contains':
+ if self.method == 'contains':
pattern = self.literals[0]
# For PyArrow compatibility
try:
@@ -264,14 +175,24 @@ class Predicate:
except Exception:
# Fallback to True
return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
- elif self.method == 'between':
- return (pyarrow_dataset.field(self.field) >= self.literals[0]) & \
- (pyarrow_dataset.field(self.field) <= self.literals[1])
- elif self.method == 'and':
- return reduce(lambda x, y: x & y,
- [p.to_arrow() for p in self.literals])
- elif self.method == 'or':
- return reduce(lambda x, y: x | y,
- [p.to_arrow() for p in self.literals])
- else:
- raise ValueError("Unsupported predicate method:
{}".format(self.method))
+
+ field = pyarrow_dataset.field(self.field)
+ dispatch = {
+ 'equal': lambda literals: field == literals[0],
+ 'notEqual': lambda literals: field != literals[0],
+ 'lessThan': lambda literals: field < literals[0],
+ 'lessOrEqual': lambda literals: field <= literals[0],
+ 'greaterThan': lambda literals: field > literals[0],
+ 'greaterOrEqual': lambda literals: field >= literals[0],
+ 'isNull': lambda literals: field.is_null(),
+ 'isNotNull': lambda literals: field.is_valid(),
+ 'in': lambda literals: field.isin(literals),
+ 'notIn': lambda literals: ~field.isin(literals),
+ 'between': lambda literals: (field >= literals[0]) & (field <=
literals[1]),
+ }
+
+ func = dispatch.get(self.method)
+ if func:
+ return func(self.literals)
+
+ raise ValueError("Unsupported predicate method:
{}".format(self.method))
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 5a8bc4825b..6b93b51d15 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -19,13 +19,14 @@
import os
from abc import ABC, abstractmethod
from functools import partial
-from typing import List, Optional, Tuple
+from typing import List, Optional, Tuple, Any
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.read.interval_partition import IntervalPartition, SortedRun
from pypaimon.read.partition_info import PartitionInfo
+from pypaimon.read.push_down_utils import trim_predicate_by_fields
from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader,
ShardBatchReader, MergeAllBatchReader
from pypaimon.read.reader.concat_record_reader import ConcatRecordReader
from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
@@ -54,21 +55,31 @@ NULL_FIELD_INDEX = -1
class SplitRead(ABC):
"""Abstract base class for split reading operations."""
- def __init__(self, table, predicate: Optional[Predicate],
push_down_predicate,
- read_type: List[DataField], split: Split):
+ def __init__(self, table, predicate: Optional[Predicate], read_type:
List[DataField], split: Split):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self.predicate = predicate
- self.push_down_predicate = push_down_predicate
+ self.push_down_predicate = self._push_down_predicate()
self.split = split
self.value_arity = len(read_type)
- self.trimmed_primary_key = [field.name for field in
self.table.table_schema.get_trimmed_primary_key_fields()]
+ self.trimmed_primary_key =
self.table.table_schema.get_trimmed_primary_keys()
self.read_fields = read_type
if isinstance(self, MergeFileSplitRead):
self.read_fields = self._create_key_value_fields(read_type)
+ def _push_down_predicate(self) -> Any:
+ if self.predicate is None:
+ return None
+ elif self.table.is_primary_key_table:
+ pk_predicate = trim_predicate_by_fields(self.predicate,
self.table.primary_keys)
+ if not pk_predicate:
+ return None
+ return pk_predicate.to_arrow()
+ else:
+ return self.predicate.to_arrow()
+
@abstractmethod
def create_reader(self) -> RecordReader:
"""Create a record reader for the given split."""
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index 061303862c..31545e4ea4 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -15,14 +15,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from typing import Any, Iterator, List, Optional
+from typing import Iterator, List, Optional
import pandas
import pyarrow
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
-from pypaimon.read.push_down_utils import trim_predicate_by_fields
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.read.split import Split
from pypaimon.read.split_read import (MergeFileSplitRead, RawFileSplitRead,
@@ -39,7 +38,6 @@ class TableRead:
self.table: FileStoreTable = table
self.predicate = predicate
- self.push_down_predicate = self._push_down_predicate()
self.read_type = read_type
def to_iterator(self, splits: List[Split]) -> Iterator:
@@ -108,23 +106,11 @@ class TableRead:
return ray.data.from_arrow(self.to_arrow(splits))
- def _push_down_predicate(self) -> Any:
- if self.predicate is None:
- return None
- elif self.table.is_primary_key_table:
- pk_predicate = trim_predicate_by_fields(self.predicate,
self.table.primary_keys)
- if not pk_predicate:
- return None
- return pk_predicate.to_arrow()
- else:
- return self.predicate.to_arrow()
-
def _create_split_read(self, split: Split) -> SplitRead:
if self.table.is_primary_key_table and not split.raw_convertible:
return MergeFileSplitRead(
table=self.table,
predicate=self.predicate,
- push_down_predicate=self.push_down_predicate,
read_type=self.read_type,
split=split
)
@@ -132,7 +118,6 @@ class TableRead:
return DataEvolutionSplitRead(
table=self.table,
predicate=self.predicate,
- push_down_predicate=self.push_down_predicate,
read_type=self.read_type,
split=split
)
@@ -140,7 +125,6 @@ class TableRead:
return RawFileSplitRead(
table=self.table,
predicate=self.predicate,
- push_down_predicate=self.push_down_predicate,
read_type=self.read_type,
split=split
)
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index e34b9a5701..b711d2e695 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -83,7 +83,7 @@ class DataBlobWriter(DataWriter):
self.blob_column_name = self._get_blob_columns_from_schema()
# Split schema into normal and blob columns
- all_column_names = [field.name for field in
self.table.table_schema.fields]
+ all_column_names = self.table.field_names
self.normal_column_names = [col for col in all_column_names if col !=
self.blob_column_name]
self.write_cols = self.normal_column_names
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index 6ee92082db..3515991933 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -44,7 +44,7 @@ class DataWriter(ABC):
self.file_io = self.table.file_io
self.trimmed_primary_key_fields =
self.table.table_schema.get_trimmed_primary_key_fields()
- self.trimmed_primary_key = [field.name for field in
self.trimmed_primary_key_fields]
+ self.trimmed_primary_key =
self.table.table_schema.get_trimmed_primary_keys()
options = self.table.options
self.target_file_size = 256 * 1024 * 1024