This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8186e4b4c6 [python] Fix filter on data evolution table not working 
issue (#7211)
8186e4b4c6 is described below

commit 8186e4b4c6a0748a71baa5e9ecffbc7ab82aac66
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Feb 25 20:16:03 2026 +0800

    [python] Fix filter on data evolution table not working issue (#7211)
    
    Filter not working on data evolution read: when a predicate is provided,
    all rows are returned.
---
 .../read/reader/filter_record_batch_reader.py      |  88 ++++++
 .../pypaimon/read/scanner/file_scanner.py          |   7 +-
 paimon-python/pypaimon/read/split_read.py          |  32 +-
 paimon-python/pypaimon/read/table_read.py          |   4 +-
 .../pypaimon/tests/data_evolution_test.py          | 328 +++++++++++++++++++++
 5 files changed, 450 insertions(+), 9 deletions(-)

diff --git a/paimon-python/pypaimon/read/reader/filter_record_batch_reader.py 
b/paimon-python/pypaimon/read/reader/filter_record_batch_reader.py
new file mode 100644
index 0000000000..1d7991c4b2
--- /dev/null
+++ b/paimon-python/pypaimon/read/reader/filter_record_batch_reader.py
@@ -0,0 +1,88 @@
+###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+###############################################################################
+
+import logging
+from typing import List, Optional
+
+import pyarrow as pa
+import pyarrow.dataset as ds
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.schema.data_types import DataField
+
+logger = logging.getLogger(__name__)
+
+
+class FilterRecordBatchReader(RecordBatchReader):
+    """
+    Wraps a RecordBatchReader and filters each batch by predicate.
+    Used for data evolution read where predicate cannot be pushed down to
+    individual file readers. Only used when predicate columns are in read 
schema.
+    """
+
+    def __init__(
+        self,
+        reader: RecordBatchReader,
+        predicate: Predicate,
+        field_names: Optional[List[str]] = None,
+        schema_fields: Optional[List[DataField]] = None,
+    ):
+        self.reader = reader
+        self.predicate = predicate
+        self.field_names = field_names
+        self.schema_fields = schema_fields
+
+    def read_arrow_batch(self) -> Optional[pa.RecordBatch]:
+        while True:
+            batch = self.reader.read_arrow_batch()
+            if batch is None:
+                return None
+            if batch.num_rows == 0:
+                return batch
+            filtered = self._filter_batch(batch)
+            if filtered is not None and filtered.num_rows > 0:
+                return filtered
+            continue
+
+    def _filter_batch(self, batch: pa.RecordBatch) -> Optional[pa.RecordBatch]:
+        expr = self.predicate.to_arrow()
+        result = ds.InMemoryDataset(pa.Table.from_batches([batch])).scanner(
+            filter=expr
+        ).to_table()
+        if result.num_rows == 0:
+            return None
+        batches = result.to_batches()
+        if not batches:
+            return None
+        if len(batches) == 1:
+            return batches[0]
+        concat_batches = getattr(pa, "concat_batches", None)
+        if concat_batches is not None:
+            return concat_batches(batches)
+        return pa.RecordBatch.from_arrays(
+            [result.column(i) for i in range(result.num_columns)],
+            schema=result.schema,
+        )
+
+    def return_batch_pos(self) -> int:
+        pos = getattr(self.reader, 'return_batch_pos', lambda: 0)()
+        return pos if pos is not None else 0
+
+    def close(self) -> None:
+        self.reader.close()
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py 
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index 22ac889414..e56c568b95 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -414,13 +414,13 @@ class FileScanner:
             return False
         if self.partition_key_predicate and not 
self.partition_key_predicate.test(entry.partition):
             return False
-        if self.deletion_vectors_enabled and entry.file.level == 0:  # do not 
read level 0 file
-            return False
         # Get SimpleStatsEvolution for this schema
         evolution = 
self.simple_stats_evolutions.get_or_create(entry.file.schema_id)
 
         # Apply evolution to stats
         if self.table.is_primary_key_table:
+            if self.deletion_vectors_enabled and entry.file.level == 0:  # do 
not read level 0 file
+                return False
             if not self.primary_key_predicate:
                 return True
             return self.primary_key_predicate.test_by_simple_stats(
@@ -432,6 +432,9 @@ class FileScanner:
                 return True
             if self.predicate_for_stats is None:
                 return True
+            # Data evolution: file stats may be from another schema, skip 
stats filter and filter in reader.
+            if self.data_evolution:
+                return True
             if entry.file.value_stats_cols is None and entry.file.write_cols 
is not None:
                 stats_fields = entry.file.write_cols
             else:
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 9b218aa0b4..67a36b5fba 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -40,6 +40,7 @@ from pypaimon.read.reader.empty_record_reader import 
EmptyFileRecordReader
 from pypaimon.read.reader.field_bunch import BlobBunch, DataBunch, FieldBunch
 from pypaimon.read.reader.filter_record_reader import FilterRecordReader
 from pypaimon.read.reader.format_avro_reader import FormatAvroReader
+from pypaimon.read.reader.filter_record_batch_reader import 
FilterRecordBatchReader
 from pypaimon.read.reader.row_range_filter_record_reader import 
RowIdFilterRecordBatchReader
 from pypaimon.read.reader.format_blob_reader import FormatBlobReader
 from pypaimon.read.reader.format_lance_reader import FormatLanceReader
@@ -52,6 +53,7 @@ from pypaimon.read.reader.key_value_unwrap_reader import \
 from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader
 from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
 from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
+from pypaimon.read.push_down_utils import _get_all_fields
 from pypaimon.read.split import Split
 from pypaimon.read.sliced_split import SlicedSplit
 from pypaimon.schema.data_types import DataField, PyarrowFieldParser
@@ -88,6 +90,13 @@ class SplitRead(ABC):
             self.read_fields = self._create_key_value_fields(read_type)
         self.schema_id_2_fields = {}
         self.deletion_file_readers = {}
+        # Only apply filter when all predicate columns are in read schema.
+        read_names = {f.name for f in self.read_fields}
+        self.predicate_for_reader = (
+            self.predicate
+            if self.predicate is not None and 
_get_all_fields(self.predicate).issubset(read_names)
+            else None
+        )
 
     def _push_down_predicate(self) -> Optional[Predicate]:
         if self.predicate is None:
@@ -382,8 +391,8 @@ class RawFileSplitRead(SplitRead):
 
         concat_reader = ConcatBatchReader(data_readers)
         # if the table is appendonly table, we don't need extra filter, all 
predicates has pushed down
-        if self.table.is_primary_key_table and self.predicate:
-            return FilterRecordReader(concat_reader, self.predicate)
+        if self.table.is_primary_key_table and self.predicate_for_reader:
+            return FilterRecordReader(concat_reader, self.predicate_for_reader)
         else:
             return concat_reader
 
@@ -424,8 +433,8 @@ class MergeFileSplitRead(SplitRead):
             section_readers.append(supplier)
         concat_reader = ConcatRecordReader(section_readers)
         kv_unwrap_reader = 
KeyValueUnwrapRecordReader(DropDeleteRecordReader(concat_reader))
-        if self.predicate:
-            return FilterRecordReader(kv_unwrap_reader, self.predicate)
+        if self.predicate_for_reader:
+            return FilterRecordReader(kv_unwrap_reader, 
self.predicate_for_reader)
         else:
             return kv_unwrap_reader
 
@@ -449,6 +458,11 @@ class DataEvolutionSplitRead(SplitRead):
             actual_split = split.data_split()
         super().__init__(table, predicate, read_type, actual_split, 
row_tracking_enabled)
 
+    def _push_down_predicate(self) -> Optional[Predicate]:
+        # Data evolution: files may have different schemas, so we don't push 
predicate
+        # to file readers; filtering is done in FilterRecordBatchReader after 
merge.
+        return None
+
     def create_reader(self) -> RecordReader:
         files = self.split.files
         suppliers = []
@@ -467,7 +481,15 @@ class DataEvolutionSplitRead(SplitRead):
                     lambda files=need_merge_files: 
self._create_union_reader(files)
                 )
 
-        return ConcatBatchReader(suppliers)
+        merge_reader = ConcatBatchReader(suppliers)
+        if self.predicate_for_reader is not None:
+            return FilterRecordBatchReader(
+                merge_reader,
+                self.predicate_for_reader,
+                field_names=[f.name for f in self.read_fields],
+                schema_fields=self.read_fields,
+            )
+        return merge_reader
 
     def _split_by_row_id(self, files: List[DataFileMeta]) -> 
List[List[DataFileMeta]]:
         """Split files by firstRowId for data evolution."""
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index e0a442f305..e996eb97fe 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -66,7 +66,7 @@ class TableRead:
         num_rows = batch.num_rows
 
         for field in target_schema:
-            if field.name in batch.column_names:
+            if field.name in batch.schema.names:
                 col = batch.column(field.name)
             else:
                 col = pyarrow.nulls(num_rows, type=field.type)
@@ -198,7 +198,7 @@ class TableRead:
         elif self.table.options.data_evolution_enabled():
             return DataEvolutionSplitRead(
                 table=self.table,
-                predicate=None,  # Never push predicate to split read
+                predicate=self.predicate,
                 read_type=self.read_type,
                 split=split,
                 row_tracking_enabled=True
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py 
b/paimon-python/pypaimon/tests/data_evolution_test.py
index b86e544982..3759bfdb46 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -21,12 +21,60 @@ import tempfile
 import unittest
 from types import SimpleNamespace
 
+import pandas as pd
 import pyarrow as pa
+import pyarrow.dataset as ds
 
 from pypaimon import CatalogFactory, Schema
+from pypaimon.common.predicate import Predicate
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
 from pypaimon.read.read_builder import ReadBuilder
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.table.row.offset_row import OffsetRow
+
+
+def _filter_batch_arrow(batch, predicate):
+    expr = predicate.to_arrow()
+    table = 
ds.InMemoryDataset(pa.Table.from_batches([batch])).scanner(filter=expr).to_table()
+    if table.num_rows == 0:
+        return batch.slice(0, 0)
+    batches = table.to_batches()
+    if len(batches) == 1:
+        return batches[0]
+    return pa.RecordBatch.from_arrays(
+        [table.column(i) for i in range(table.num_columns)], 
schema=table.schema
+    )
+
+
+def _filter_batch_row_by_row(batch, predicate, ncols):
+    nrows = batch.num_rows
+    mask = []
+    row_tuple = [None] * ncols
+    offset_row = OffsetRow(row_tuple, 0, ncols)
+    for i in range(nrows):
+        for j in range(ncols):
+            row_tuple[j] = batch.column(j)[i].as_py()
+        offset_row.replace(tuple(row_tuple))
+        try:
+            mask.append(predicate.test(offset_row))
+        except (TypeError, ValueError):
+            mask.append(False)
+    if not any(mask):
+        return batch.slice(0, 0)
+    return batch.filter(pa.array(mask))
+
+
+def _batches_equal(a, b):
+    if a.num_rows != b.num_rows or a.num_columns != b.num_columns:
+        return False
+    for i in range(a.num_columns):
+        col_a, col_b = a.column(i), b.column(i)
+        for j in range(a.num_rows):
+            va_py = col_a[j].as_py() if hasattr(col_a[j], "as_py") else 
col_a[j]
+            vb_py = col_b[j].as_py() if hasattr(col_b[j], "as_py") else 
col_b[j]
+            if va_py != vb_py:
+                return False
+    return True
 
 
 class DataEvolutionTest(unittest.TestCase):
@@ -513,6 +561,286 @@ class DataEvolutionTest(unittest.TestCase):
         }, schema=simple_pa_schema)
         self.assertEqual(actual, expect)
 
+    def _create_filter_test_table(self, table_name: str):
+        pa_schema = pa.schema([
+            ("id", pa.int64()),
+            ("b", pa.int32()),
+            pa.field("c", pa.int32(), nullable=True),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema, options={"row-tracking.enabled": "true", 
"data-evolution.enabled": "true"},
+        )
+        self.catalog.create_table(table_name, schema, ignore_if_exists=True)
+        table = self.catalog.get_table(table_name)
+        wb = table.new_batch_write_builder()
+        w0, c0 = wb.new_write().with_write_type(["id", "b"]), wb.new_commit()
+        w0.write_arrow(pa.Table.from_pydict(
+            {"id": [1, 2, 3], "b": [10, 20, 30]},
+            schema=pa.schema([("id", pa.int64()), ("b", pa.int32())]),
+        ))
+        c0.commit(w0.prepare_commit())
+        w0.close()
+        c0.close()
+        w1, c1 = wb.new_write().with_write_type(["c"]), wb.new_commit()
+        w1.write_arrow(pa.Table.from_pydict(
+            {"c": [100, None, 200]},
+            schema=pa.schema([pa.field("c", pa.int32(), nullable=True)]),
+        ))
+        cmts1 = w1.prepare_commit()
+        for cmt in cmts1:
+            for nf in cmt.new_files:
+                nf.first_row_id = 0
+        c1.commit(cmts1)
+        w1.close()
+        c1.close()
+        return table
+
+    def test_with_filter(self):
+        table = 
self._create_filter_test_table("default.test_filter_on_evolved_column")
+        rb = table.new_read_builder()
+        splits = rb.new_scan().plan().splits()
+
+        full_df = rb.new_read().to_pandas(splits)
+        self.assertEqual(len(full_df), 3, "Full scan must return 3 rows")
+        full_sorted = full_df.sort_values("id").reset_index(drop=True)
+        self.assertEqual(full_sorted["id"].tolist(), [1, 2, 3])
+        self.assertEqual(full_sorted["b"].tolist(), [10, 20, 30])
+        self.assertEqual(full_sorted["c"].iloc[0], 100)
+        self.assertTrue(pd.isna(full_sorted["c"].iloc[1]), "Row id=2 must have 
NULL c")
+        self.assertEqual(full_sorted["c"].iloc[2], 200)
+
+        predicate_gt = rb.new_predicate_builder().greater_than("c", 150)
+        rb_gt = table.new_read_builder().with_filter(predicate_gt)
+        result_gt = 
rb_gt.new_read().to_pandas(rb_gt.new_scan().plan().splits())
+        self.assertEqual(len(result_gt), 1, "Filter c > 150 should return 1 
row (c=200)")
+        self.assertEqual(result_gt["id"].iloc[0], 3, "Row with c=200 must have 
id=3")
+        self.assertEqual(result_gt["b"].iloc[0], 30, "Row with c=200 must have 
b=30")
+        self.assertEqual(result_gt["c"].iloc[0], 200, "Filtered row must have 
c=200")
+
+        predicate_lt = rb.new_predicate_builder().less_than("c", 150)
+        rb_lt = table.new_read_builder().with_filter(predicate_lt)
+        result_lt = 
rb_lt.new_read().to_pandas(rb_lt.new_scan().plan().splits())
+        self.assertEqual(len(result_lt), 1, "Filter c < 150 should return 1 
row (c=100)")
+        self.assertEqual(result_lt["id"].iloc[0], 1, "Row with c=100 must have 
id=1")
+        self.assertEqual(result_lt["c"].iloc[0], 100, "Filtered row must have 
c=100")
+
+        predicate_id = rb.new_predicate_builder().equal("id", 2)
+        rb_id = table.new_read_builder().with_filter(predicate_id)
+        result_id = 
rb_id.new_read().to_pandas(rb_id.new_scan().plan().splits())
+        self.assertEqual(len(result_id), 1, "Filter id == 2 should return 1 
row")
+        self.assertEqual(result_id["id"].iloc[0], 2, "Filtered row must have 
id=2")
+        self.assertTrue(pd.isna(result_id["c"].iloc[0]), "Row id=2 must have 
c=NULL")
+
+        pb = rb.new_predicate_builder()
+        predicate_and = pb.and_predicates([
+            pb.greater_than("c", 50),
+            pb.less_than("c", 150),
+        ])
+        rb_and = table.new_read_builder().with_filter(predicate_and)
+        result_and = 
rb_and.new_read().to_pandas(rb_and.new_scan().plan().splits())
+        self.assertEqual(
+            len(result_and), 1,
+            "Filter c>50 AND c<150 should return 1 row (c=100)",
+        )
+        self.assertEqual(result_and["id"].iloc[0], 1, "Row with c=100 must 
have id=1")
+        self.assertEqual(result_and["c"].iloc[0], 100, "Filtered row must have 
c=100")
+
+        predicate_is_null = rb.new_predicate_builder().is_null("c")
+        rb_null = table.new_read_builder().with_filter(predicate_is_null)
+        result_null = 
rb_null.new_read().to_pandas(rb_null.new_scan().plan().splits())
+        self.assertEqual(len(result_null), 1, "Filter c IS NULL should return 
1 row (id=2)")
+        self.assertEqual(result_null["id"].iloc[0], 2, "NULL row must have 
id=2")
+        self.assertTrue(pd.isna(result_null["c"].iloc[0]), "Filtered row c 
must be NULL")
+
+        predicate_not_null = rb.new_predicate_builder().is_not_null("c")
+        rb_not_null = table.new_read_builder().with_filter(predicate_not_null)
+        result_not_null = rb_not_null.new_read().to_pandas(
+            rb_not_null.new_scan().plan().splits())
+        self.assertEqual(
+            len(result_not_null), 2,
+            "Filter c IS NOT NULL should return 2 rows (id=1, id=3)",
+        )
+        result_not_null_sorted = 
result_not_null.sort_values("id").reset_index(drop=True)
+        self.assertEqual(result_not_null_sorted["id"].tolist(), [1, 3])
+        self.assertEqual(result_not_null_sorted["c"].tolist(), [100, 200])
+
+        predicate_or = pb.or_predicates([
+            pb.greater_than("c", 150),
+            pb.less_than("c", 100),
+        ])
+        rb_or = table.new_read_builder().with_filter(predicate_or)
+        result_or = 
rb_or.new_read().to_pandas(rb_or.new_scan().plan().splits())
+        self.assertEqual(
+            len(result_or), 1,
+            "Filter c>150 OR c<100 should return 1 row (id=3, c=200)",
+        )
+        self.assertEqual(result_or["id"].iloc[0], 3, "Row with c=200 must have 
id=3")
+        self.assertEqual(result_or["c"].iloc[0], 200, "Filtered row must have 
c=200")
+
+    def test_with_filter_and_projection(self):
+        table = 
self._create_filter_test_table("default.test_filter_and_projection_evolved")
+        rb_full = table.new_read_builder()
+        predicate = rb_full.new_predicate_builder().greater_than("c", 150)
+        rb_filtered = table.new_read_builder().with_projection(["c", 
"id"]).with_filter(predicate)
+        result = 
rb_filtered.new_read().to_pandas(rb_filtered.new_scan().plan().splits())
+        self.assertEqual(len(result), 1, "Filter c > 150 with projection [c, 
id] should return 1 row")
+        self.assertEqual(result["id"].iloc[0], 3)
+        self.assertEqual(result["c"].iloc[0], 200)
+        for _, row in result.iterrows():
+            self.assertGreater(
+                row["c"],
+                150,
+                "Each row must satisfy predicate c > 150 (row-by-row path uses 
predicate.index; "
+                "if schema_fields != read_type, wrong column is compared).",
+            )
+
+        predicate2 = rb_full.new_predicate_builder().is_null("c")
+        rb2_filtered = table.new_read_builder().with_projection(["id", 
"c"]).with_filter(predicate2)
+        result2 = 
rb2_filtered.new_read().to_pandas(rb2_filtered.new_scan().plan().splits())
+        self.assertEqual(len(result2), 1, "Filter c IS NULL with projection 
[id, c] should return 1 row")
+        self.assertEqual(result2["id"].iloc[0], 2)
+        self.assertTrue(pd.isna(result2["c"].iloc[0]))
+
+        predicate3 = rb_full.new_predicate_builder().greater_than("c", 50)
+        rb3_filtered = 
table.new_read_builder().with_projection(["c"]).with_filter(predicate3)
+        result3 = 
rb3_filtered.new_read().to_pandas(rb3_filtered.new_scan().plan().splits())
+        self.assertEqual(len(result3), 2, "Filter c > 50 with projection [c] 
should return 2 rows (c=100, 200)")
+        self.assertEqual(sorted(result3["c"].tolist()), [100, 200])
+
+        # Build predicate from same read_type as projection [id, c] so indices 
match (c at index 1).
+        rb4 = table.new_read_builder().with_projection(["id", "c"])
+        pb4 = rb4.new_predicate_builder()
+        predicate_compound = pb4.and_predicates([
+            pb4.greater_than("c", 150),
+            pb4.is_not_null("c"),
+        ])
+        rb4_filtered = rb4.with_filter(predicate_compound)
+        result4 = 
rb4_filtered.new_read().to_pandas(rb4_filtered.new_scan().plan().splits())
+        self.assertEqual(len(result4), 1, "Filter c>150 AND c IS NOT NULL with 
projection [id,c] should return 1 row")
+        self.assertEqual(result4["id"].iloc[0], 3)
+        self.assertEqual(result4["c"].iloc[0], 200)
+
+        predicate_filter_on_non_projected = 
rb_full.new_predicate_builder().greater_than("c", 150)
+        rb_non_projected = 
table.new_read_builder().with_projection(["id"]).with_filter(
+            predicate_filter_on_non_projected
+        )
+        result_non_projected = rb_non_projected.new_read().to_pandas(
+            rb_non_projected.new_scan().plan().splits()
+        )
+        self.assertEqual(
+            len(result_non_projected),
+            3,
+            "Filter c > 150 with projection [id]: c not in read_type so filter 
is dropped, all 3 rows returned.",
+        )
+        self.assertEqual(
+            list(result_non_projected.columns),
+            ["id"],
+            "Projection [id] should return only id column.",
+        )
+        table_read = rb_non_projected.new_read()
+        splits = rb_non_projected.new_scan().plan().splits()
+        expected_output_arity = len(table_read.read_type)
+        try:
+            rows_from_iterator = list(table_read.to_iterator(splits))
+        except ValueError as e:
+            if "Expected Arrow table or array" in str(e):
+                self.skipTest(
+                    "RecordBatchReader path uses 
polars.from_arrow(RecordBatch) which fails; "
+                    "skip to_iterator projection assertion on this path"
+                )
+            raise
+        self.assertEqual(len(rows_from_iterator), 3, "to_iterator should 
return same row count as to_pandas")
+        for row in rows_from_iterator:
+            self.assertIsInstance(row, OffsetRow)
+            self.assertEqual(
+                row.arity,
+                expected_output_arity,
+                "to_iterator must yield rows with only read_type columns 
(arity=%d)."
+                % expected_output_arity,
+            )
+
+    def test_null_predicate_arrow_vs_row_by_row(self):
+        schema = pa.schema([("id", pa.int64()), ("c", pa.int64())])
+        batch = pa.RecordBatch.from_pydict(
+            {"id": [1, 2, 3], "c": [10, None, 20]},
+            schema=schema,
+        )
+        ncols = 2
+
+        # is_null('c'): Arrow and row-by-row must return same rows
+        pred_is_null = Predicate(method="isNull", index=1, field="c", 
literals=None)
+        arrow_res = _filter_batch_arrow(batch, pred_is_null)
+        row_res = _filter_batch_row_by_row(batch, pred_is_null, ncols)
+        self.assertEqual(arrow_res.num_rows, row_res.num_rows)
+        self.assertTrue(_batches_equal(arrow_res, row_res))
+        self.assertEqual(arrow_res.num_rows, 1)
+        self.assertEqual(arrow_res.column("id")[0].as_py(), 2)
+        self.assertIsNone(arrow_res.column("c")[0].as_py())
+
+        # is_not_null('c'): Arrow and row-by-row must return same rows
+        pred_not_null = Predicate(method="isNotNull", index=1, field="c", 
literals=None)
+        arrow_res2 = _filter_batch_arrow(batch, pred_not_null)
+        row_res2 = _filter_batch_row_by_row(batch, pred_not_null, ncols)
+        self.assertEqual(arrow_res2.num_rows, row_res2.num_rows)
+        self.assertTrue(_batches_equal(arrow_res2, row_res2))
+        self.assertEqual(arrow_res2.num_rows, 2)
+
+        pred_eq_null = Predicate(method="equal", index=1, field="c", 
literals=[None])
+        row_res3 = _filter_batch_row_by_row(batch, pred_eq_null, ncols)
+        self.assertEqual(row_res3.num_rows, 0)  # Paimon: val is None -> 
False, no row matches
+        arrow_res3 = _filter_batch_arrow(batch, pred_eq_null)
+        self.assertEqual(arrow_res3.num_rows, 0)  # Arrow: NULL==NULL is null, 
filtered out
+        self.assertEqual(arrow_res3.num_rows, row_res3.num_rows)
+
+    def test_filter_row_by_row_mismatched_schema(self):
+        batch = pa.RecordBatch.from_pydict(
+            {"c": [1, 200, 50], "id": [100, 2, 3]},
+            schema=pa.schema([("c", pa.int64()), ("id", pa.int64())]),
+        )
+        pred = Predicate(method="greaterThan", index=0, field="c", 
literals=[150])
+
+        ncols = 3
+        nrows = batch.num_rows
+        id_col = batch.column("id")
+        c_col = batch.column("c")
+        row_tuple = [None] * ncols
+        offset_row = OffsetRow(row_tuple, 0, ncols)
+        mask = []
+        for i in range(nrows):
+            row_tuple[0] = id_col[i].as_py()
+            row_tuple[1] = None
+            row_tuple[2] = c_col[i].as_py()
+            offset_row.replace(tuple(row_tuple))
+            try:
+                mask.append(pred.test(offset_row))
+            except (TypeError, ValueError):
+                mask.append(False)
+        rows_passing_wrong_layout = sum(mask)
+        self.assertEqual(
+            rows_passing_wrong_layout,
+            0,
+            "With wrong layout (position 0 = id), predicate c > 150 becomes id 
> 150 -> 0 rows. "
+            "This reproduces FilterRecordBatchReader bug when 
schema_fields=table.fields.",
+        )
+        ncols_right = 2
+        row_tuple_right = [None] * ncols_right
+        offset_row_right = OffsetRow(row_tuple_right, 0, ncols_right)
+        mask_right = []
+        for i in range(nrows):
+            row_tuple_right[0] = c_col[i].as_py()
+            row_tuple_right[1] = id_col[i].as_py()
+            offset_row_right.replace(tuple(row_tuple_right))
+            try:
+                mask_right.append(pred.test(offset_row_right))
+            except (TypeError, ValueError):
+                mask_right.append(False)
+        rows_passing_right_layout = sum(mask_right)
+        self.assertEqual(
+            rows_passing_right_layout,
+            1,
+            "With correct layout (position 0 = c), predicate c > 150 -> 1 row 
(c=200).",
+        )
+
     def test_null_values(self):
         simple_pa_schema = pa.schema([
             ('f0', pa.int32()),

Reply via email to