This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 139f65cc8f10d478b8f0545382146f7e1cbc1b8c
Author: ChengHui Chen <[email protected]>
AuthorDate: Mon Oct 20 21:09:10 2025 +0800

    [Python] clean code for pypaimon (#6433)
---
 paimon-python/pypaimon/common/predicate.py         | 231 +++++++--------------
 paimon-python/pypaimon/read/split_read.py          |  21 +-
 paimon-python/pypaimon/read/table_read.py          |  18 +-
 .../pypaimon/write/writer/data_blob_writer.py      |   2 +-
 paimon-python/pypaimon/write/writer/data_writer.py |   2 +-
 5 files changed, 95 insertions(+), 179 deletions(-)

diff --git a/paimon-python/pypaimon/common/predicate.py 
b/paimon-python/pypaimon/common/predicate.py
index 4ca8644f6e..a245bb8e1e 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -50,95 +50,33 @@ class Predicate:
             literals=literals)
 
     def test(self, record: InternalRow) -> bool:
-        if self.method == 'equal':
-            return record.get_field(self.index) == self.literals[0]
-        elif self.method == 'notEqual':
-            return record.get_field(self.index) != self.literals[0]
-        elif self.method == 'lessThan':
-            return record.get_field(self.index) < self.literals[0]
-        elif self.method == 'lessOrEqual':
-            return record.get_field(self.index) <= self.literals[0]
-        elif self.method == 'greaterThan':
-            return record.get_field(self.index) > self.literals[0]
-        elif self.method == 'greaterOrEqual':
-            return record.get_field(self.index) >= self.literals[0]
-        elif self.method == 'isNull':
-            return record.get_field(self.index) is None
-        elif self.method == 'isNotNull':
-            return record.get_field(self.index) is not None
-        elif self.method == 'startsWith':
-            field_value = record.get_field(self.index)
-            if not isinstance(field_value, str):
-                return False
-            return field_value.startswith(self.literals[0])
-        elif self.method == 'endsWith':
-            field_value = record.get_field(self.index)
-            if not isinstance(field_value, str):
-                return False
-            return field_value.endswith(self.literals[0])
-        elif self.method == 'contains':
-            field_value = record.get_field(self.index)
-            if not isinstance(field_value, str):
-                return False
-            return self.literals[0] in field_value
-        elif self.method == 'in':
-            return record.get_field(self.index) in self.literals
-        elif self.method == 'notIn':
-            return record.get_field(self.index) not in self.literals
-        elif self.method == 'between':
-            field_value = record.get_field(self.index)
-            return self.literals[0] <= field_value <= self.literals[1]
-        elif self.method == 'and':
-            return all(p.test(record) for p in self.literals)
-        elif self.method == 'or':
-            t = any(p.test(record) for p in self.literals)
-            return t
-        else:
-            raise ValueError("Unsupported predicate method: 
{}".format(self.method))
-
-    def test_by_value(self, value: Any) -> bool:
         if self.method == 'and':
-            return all(p.test_by_value(value) for p in self.literals)
+            return all(p.test(record) for p in self.literals)
         if self.method == 'or':
-            t = any(p.test_by_value(value) for p in self.literals)
+            t = any(p.test(record) for p in self.literals)
             return t
 
-        if self.method == 'equal':
-            return value == self.literals[0]
-        if self.method == 'notEqual':
-            return value != self.literals[0]
-        if self.method == 'lessThan':
-            return value < self.literals[0]
-        if self.method == 'lessOrEqual':
-            return value <= self.literals[0]
-        if self.method == 'greaterThan':
-            return value > self.literals[0]
-        if self.method == 'greaterOrEqual':
-            return value >= self.literals[0]
-        if self.method == 'isNull':
-            return value is None
-        if self.method == 'isNotNull':
-            return value is not None
-        if self.method == 'startsWith':
-            if not isinstance(value, str):
-                return False
-            return value.startswith(self.literals[0])
-        if self.method == 'endsWith':
-            if not isinstance(value, str):
-                return False
-            return value.endswith(self.literals[0])
-        if self.method == 'contains':
-            if not isinstance(value, str):
-                return False
-            return self.literals[0] in value
-        if self.method == 'in':
-            return value in self.literals
-        if self.method == 'notIn':
-            return value not in self.literals
-        if self.method == 'between':
-            return self.literals[0] <= value <= self.literals[1]
-
-        raise ValueError("Unsupported predicate method: 
{}".format(self.method))
+        dispatch = {
+            'equal': lambda val, literals: val == literals[0],
+            'notEqual': lambda val, literals: val != literals[0],
+            'lessThan': lambda val, literals: val < literals[0],
+            'lessOrEqual': lambda val, literals: val <= literals[0],
+            'greaterThan': lambda val, literals: val > literals[0],
+            'greaterOrEqual': lambda val, literals: val >= literals[0],
+            'isNull': lambda val, literals: val is None,
+            'isNotNull': lambda val, literals: val is not None,
+            'startsWith': lambda val, literals: isinstance(val, str) and 
val.startswith(literals[0]),
+            'endsWith': lambda val, literals: isinstance(val, str) and 
val.endswith(literals[0]),
+            'contains': lambda val, literals: isinstance(val, str) and 
literals[0] in val,
+            'in': lambda val, literals: val in literals,
+            'notIn': lambda val, literals: val not in literals,
+            'between': lambda val, literals: literals[0] <= val <= literals[1],
+        }
+        func = dispatch.get(self.method)
+        if func:
+            field_value = record.get_field(self.index)
+            return func(field_value, self.literals)
+        raise ValueError(f"Unsupported predicate method: {self.method}")
 
     def test_by_simple_stats(self, stat: SimpleStats, row_count: int) -> bool:
         return self.test_by_stats({
@@ -169,66 +107,39 @@ class Predicate:
         max_value = stat["max_values"][self.field]
 
         if min_value is None or max_value is None or (null_count is not None 
and null_count == row_count):
-            return False
-
-        if self.method == 'equal':
-            return min_value <= self.literals[0] <= max_value
-        if self.method == 'notEqual':
-            return not (min_value == self.literals[0] == max_value)
-        if self.method == 'lessThan':
-            return self.literals[0] > min_value
-        if self.method == 'lessOrEqual':
-            return self.literals[0] >= min_value
-        if self.method == 'greaterThan':
-            return self.literals[0] < max_value
-        if self.method == 'greaterOrEqual':
-            return self.literals[0] <= max_value
-        if self.method == 'startsWith':
-            if not isinstance(min_value, str) or not isinstance(max_value, 
str):
-                raise RuntimeError("startsWith predicate on non-str field")
-            return ((min_value.startswith(self.literals[0]) or min_value < 
self.literals[0])
-                    and (max_value.startswith(self.literals[0]) or max_value > 
self.literals[0]))
-        if self.method == 'endsWith':
+            # invalid stats, skip validation
             return True
-        if self.method == 'contains':
-            return True
-        if self.method == 'in':
-            for literal in self.literals:
-                if min_value <= literal <= max_value:
-                    return True
-            return False
-        if self.method == 'notIn':
-            for literal in self.literals:
-                if min_value == literal == max_value:
-                    return False
-            return True
-        if self.method == 'between':
-            return self.literals[0] <= max_value and self.literals[1] >= 
min_value
-        else:
-            raise ValueError("Unsupported predicate method: 
{}".format(self.method))
+
+        dispatch = {
+            'equal': lambda literals: min_value <= literals[0] <= max_value,
+            'notEqual': lambda literals: not (min_value == literals[0] == 
max_value),
+            'lessThan': lambda literals: literals[0] > min_value,
+            'lessOrEqual': lambda literals: literals[0] >= min_value,
+            'greaterThan': lambda literals: literals[0] < max_value,
+            'greaterOrEqual': lambda literals: literals[0] <= max_value,
+            'in': lambda literals: any(min_value <= l <= max_value for l in 
literals),
+            'notIn': lambda literals: not any(min_value == l == max_value for 
l in literals),
+            'between': lambda literals: literals[0] <= max_value and 
literals[1] >= min_value,
+            'startsWith': lambda literals: ((isinstance(min_value, str) and 
isinstance(max_value, str)) and
+                                            
((min_value.startswith(literals[0]) or min_value < literals[0]) and
+                                             
(max_value.startswith(literals[0]) or max_value > literals[0]))),
+            'endsWith': lambda literals: True,
+            'contains': lambda literals: True,
+        }
+        func = dispatch.get(self.method)
+        if func:
+            return func(self.literals)
+        raise ValueError(f"Unsupported predicate method: {self.method}")
 
     def to_arrow(self) -> Any:
-        if self.method == 'equal':
-            return pyarrow_dataset.field(self.field) == self.literals[0]
-        elif self.method == 'notEqual':
-            return pyarrow_dataset.field(self.field) != self.literals[0]
-        elif self.method == 'lessThan':
-            return pyarrow_dataset.field(self.field) < self.literals[0]
-        elif self.method == 'lessOrEqual':
-            return pyarrow_dataset.field(self.field) <= self.literals[0]
-        elif self.method == 'greaterThan':
-            return pyarrow_dataset.field(self.field) > self.literals[0]
-        elif self.method == 'greaterOrEqual':
-            return pyarrow_dataset.field(self.field) >= self.literals[0]
-        elif self.method == 'isNull':
-            return pyarrow_dataset.field(self.field).is_null()
-        elif self.method == 'isNotNull':
-            return pyarrow_dataset.field(self.field).is_valid()
-        elif self.method == 'in':
-            return pyarrow_dataset.field(self.field).isin(self.literals)
-        elif self.method == 'notIn':
-            return ~pyarrow_dataset.field(self.field).isin(self.literals)
-        elif self.method == 'startsWith':
+        if self.method == 'and':
+            return reduce(lambda x, y: x & y,
+                          [p.to_arrow() for p in self.literals])
+        if self.method == 'or':
+            return reduce(lambda x, y: x | y,
+                          [p.to_arrow() for p in self.literals])
+
+        if self.method == 'startsWith':
             pattern = self.literals[0]
             # For PyArrow compatibility - improved approach
             try:
@@ -240,7 +151,7 @@ class Predicate:
             except Exception:
                 # Fallback to True
                 return pyarrow_dataset.field(self.field).is_valid() | 
pyarrow_dataset.field(self.field).is_null()
-        elif self.method == 'endsWith':
+        if self.method == 'endsWith':
             pattern = self.literals[0]
             # For PyArrow compatibility
             try:
@@ -252,7 +163,7 @@ class Predicate:
             except Exception:
                 # Fallback to True
                 return pyarrow_dataset.field(self.field).is_valid() | 
pyarrow_dataset.field(self.field).is_null()
-        elif self.method == 'contains':
+        if self.method == 'contains':
             pattern = self.literals[0]
             # For PyArrow compatibility
             try:
@@ -264,14 +175,24 @@ class Predicate:
             except Exception:
                 # Fallback to True
                 return pyarrow_dataset.field(self.field).is_valid() | 
pyarrow_dataset.field(self.field).is_null()
-        elif self.method == 'between':
-            return (pyarrow_dataset.field(self.field) >= self.literals[0]) & \
-                (pyarrow_dataset.field(self.field) <= self.literals[1])
-        elif self.method == 'and':
-            return reduce(lambda x, y: x & y,
-                          [p.to_arrow() for p in self.literals])
-        elif self.method == 'or':
-            return reduce(lambda x, y: x | y,
-                          [p.to_arrow() for p in self.literals])
-        else:
-            raise ValueError("Unsupported predicate method: 
{}".format(self.method))
+
+        field = pyarrow_dataset.field(self.field)
+        dispatch = {
+            'equal': lambda literals: field == literals[0],
+            'notEqual': lambda literals: field != literals[0],
+            'lessThan': lambda literals: field < literals[0],
+            'lessOrEqual': lambda literals: field <= literals[0],
+            'greaterThan': lambda literals: field > literals[0],
+            'greaterOrEqual': lambda literals: field >= literals[0],
+            'isNull': lambda literals: field.is_null(),
+            'isNotNull': lambda literals: field.is_valid(),
+            'in': lambda literals: field.isin(literals),
+            'notIn': lambda literals: ~field.isin(literals),
+            'between': lambda literals: (field >= literals[0]) & (field <= 
literals[1]),
+        }
+
+        func = dispatch.get(self.method)
+        if func:
+            return func(self.literals)
+
+        raise ValueError("Unsupported predicate method: 
{}".format(self.method))
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 5a8bc4825b..6b93b51d15 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -19,13 +19,14 @@
 import os
 from abc import ABC, abstractmethod
 from functools import partial
-from typing import List, Optional, Tuple
+from typing import List, Optional, Tuple, Any
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.read.interval_partition import IntervalPartition, SortedRun
 from pypaimon.read.partition_info import PartitionInfo
+from pypaimon.read.push_down_utils import trim_predicate_by_fields
 from pypaimon.read.reader.concat_batch_reader import ConcatBatchReader, 
ShardBatchReader, MergeAllBatchReader
 from pypaimon.read.reader.concat_record_reader import ConcatRecordReader
 from pypaimon.read.reader.data_file_batch_reader import DataFileBatchReader
@@ -54,21 +55,31 @@ NULL_FIELD_INDEX = -1
 class SplitRead(ABC):
     """Abstract base class for split reading operations."""
 
-    def __init__(self, table, predicate: Optional[Predicate], 
push_down_predicate,
-                 read_type: List[DataField], split: Split):
+    def __init__(self, table, predicate: Optional[Predicate], read_type: 
List[DataField], split: Split):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
         self.predicate = predicate
-        self.push_down_predicate = push_down_predicate
+        self.push_down_predicate = self._push_down_predicate()
         self.split = split
         self.value_arity = len(read_type)
 
-        self.trimmed_primary_key = [field.name for field in 
self.table.table_schema.get_trimmed_primary_key_fields()]
+        self.trimmed_primary_key = 
self.table.table_schema.get_trimmed_primary_keys()
         self.read_fields = read_type
         if isinstance(self, MergeFileSplitRead):
             self.read_fields = self._create_key_value_fields(read_type)
 
+    def _push_down_predicate(self) -> Any:
+        if self.predicate is None:
+            return None
+        elif self.table.is_primary_key_table:
+            pk_predicate = trim_predicate_by_fields(self.predicate, 
self.table.primary_keys)
+            if not pk_predicate:
+                return None
+            return pk_predicate.to_arrow()
+        else:
+            return self.predicate.to_arrow()
+
     @abstractmethod
     def create_reader(self) -> RecordReader:
         """Create a record reader for the given split."""
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index 061303862c..31545e4ea4 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -15,14 +15,13 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from typing import Any, Iterator, List, Optional
+from typing import Iterator, List, Optional
 
 import pandas
 import pyarrow
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
-from pypaimon.read.push_down_utils import trim_predicate_by_fields
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
 from pypaimon.read.split import Split
 from pypaimon.read.split_read import (MergeFileSplitRead, RawFileSplitRead,
@@ -39,7 +38,6 @@ class TableRead:
 
         self.table: FileStoreTable = table
         self.predicate = predicate
-        self.push_down_predicate = self._push_down_predicate()
         self.read_type = read_type
 
     def to_iterator(self, splits: List[Split]) -> Iterator:
@@ -108,23 +106,11 @@ class TableRead:
 
         return ray.data.from_arrow(self.to_arrow(splits))
 
-    def _push_down_predicate(self) -> Any:
-        if self.predicate is None:
-            return None
-        elif self.table.is_primary_key_table:
-            pk_predicate = trim_predicate_by_fields(self.predicate, 
self.table.primary_keys)
-            if not pk_predicate:
-                return None
-            return pk_predicate.to_arrow()
-        else:
-            return self.predicate.to_arrow()
-
     def _create_split_read(self, split: Split) -> SplitRead:
         if self.table.is_primary_key_table and not split.raw_convertible:
             return MergeFileSplitRead(
                 table=self.table,
                 predicate=self.predicate,
-                push_down_predicate=self.push_down_predicate,
                 read_type=self.read_type,
                 split=split
             )
@@ -132,7 +118,6 @@ class TableRead:
             return DataEvolutionSplitRead(
                 table=self.table,
                 predicate=self.predicate,
-                push_down_predicate=self.push_down_predicate,
                 read_type=self.read_type,
                 split=split
             )
@@ -140,7 +125,6 @@ class TableRead:
             return RawFileSplitRead(
                 table=self.table,
                 predicate=self.predicate,
-                push_down_predicate=self.push_down_predicate,
                 read_type=self.read_type,
                 split=split
             )
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py 
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index e34b9a5701..b711d2e695 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -83,7 +83,7 @@ class DataBlobWriter(DataWriter):
         self.blob_column_name = self._get_blob_columns_from_schema()
 
         # Split schema into normal and blob columns
-        all_column_names = [field.name for field in 
self.table.table_schema.fields]
+        all_column_names = self.table.field_names
         self.normal_column_names = [col for col in all_column_names if col != 
self.blob_column_name]
         self.write_cols = self.normal_column_names
 
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 6ee92082db..3515991933 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -44,7 +44,7 @@ class DataWriter(ABC):
 
         self.file_io = self.table.file_io
         self.trimmed_primary_key_fields = 
self.table.table_schema.get_trimmed_primary_key_fields()
-        self.trimmed_primary_key = [field.name for field in 
self.trimmed_primary_key_fields]
+        self.trimmed_primary_key = 
self.table.table_schema.get_trimmed_primary_keys()
 
         options = self.table.options
         self.target_file_size = 256 * 1024 * 1024

Reply via email to