This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 73825eb63184fda93ced18abeb6ac69f2e2afd8f
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 31 16:45:53 2026 +0800

    [python] Push down row ranges to vortex reader (#7558)
    
    Push the row ranges down to the Vortex reader layer and use vortex_file.
    scan (indices=...) for native row filtering to avoid filtering after
    full read. There is no problem with the direction, and the performance
    benefits are significant.
---
 .../pypaimon/benchmark/clickbench_format.py        | 374 +++++++++++++++++++++
 .../pypaimon/read/reader/format_vortex_reader.py   | 130 +++++--
 paimon-python/pypaimon/read/split_read.py          |  54 ++-
 .../pypaimon/tests/data_evolution_test.py          | 156 +++++++++
 .../pypaimon/tests/reader_append_only_test.py      |  44 ++-
 5 files changed, 702 insertions(+), 56 deletions(-)

diff --git a/paimon-python/pypaimon/benchmark/clickbench_format.py 
b/paimon-python/pypaimon/benchmark/clickbench_format.py
new file mode 100644
index 0000000000..9f4bc5c22c
--- /dev/null
+++ b/paimon-python/pypaimon/benchmark/clickbench_format.py
@@ -0,0 +1,374 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+Clickbench benchmark for Paimon file formats.
+
+Downloads the Clickbench hits.parquet dataset and compares compression ratios
+and read performance across Parquet, ORC, and Vortex formats in Paimon tables.
+
+Usage:
+    python benchmarks/clickbench_format.py [--rows N] [--data-path PATH]
+
+    --rows N         Number of rows to use (default: 1_000_000, use 0 for full 
dataset)
+    --data-path PATH Path to existing hits.parquet (skips download if provided)
+"""
+
+import argparse
+import os
+import random
+import shutil
+import sys
+import tempfile
+import time
+from pathlib import Path
+
+import pyarrow as pa
+import pyarrow.parquet as pq
+
+# Add project root to path
+sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.globalindex.indexed_split import IndexedSplit
+from pypaimon.utils.range import Range
+
+CLICKBENCH_URL = "https://datasets.clickhouse.com/hits_compatible/hits.parquet";
+
+# Formats to benchmark
+FORMATS = ["parquet", "orc", "lance", "vortex"]
+
+
+def download_clickbench(dest_path: str):
+    """Download Clickbench hits.parquet if not already present."""
+    if os.path.exists(dest_path):
+        print(f"[INFO] Using cached dataset: {dest_path}")
+        return
+    print(f"[INFO] Downloading Clickbench dataset to {dest_path} ...")
+    print(f"[INFO] URL: {CLICKBENCH_URL}")
+    print("[INFO] This is ~14GB, may take a while.")
+
+    import urllib.request
+    urllib.request.urlretrieve(CLICKBENCH_URL, dest_path)
+    print("[INFO] Download complete.")
+
+
+def load_data(data_path: str, max_rows: int) -> pa.Table:
+    """Load Clickbench parquet data, optionally limiting rows."""
+    print(f"[INFO] Loading data from {data_path} ...")
+    pf = pq.ParquetFile(data_path)
+    if max_rows > 0:
+        # Read enough row groups to satisfy max_rows
+        batches = []
+        total = 0
+        for rg_idx in range(pf.metadata.num_row_groups):
+            rg = pf.read_row_group(rg_idx)
+            batches.append(rg)
+            total += rg.num_rows
+            if total >= max_rows:
+                break
+        table = pa.concat_tables(batches)
+        if table.num_rows > max_rows:
+            table = table.slice(0, max_rows)
+    else:
+        table = pf.read()
+
+    # Cast unsigned integer types to signed (Paimon doesn't support unsigned)
+    uint_to_int = {
+        pa.uint8(): pa.int16(),
+        pa.uint16(): pa.int32(),
+        pa.uint32(): pa.int64(),
+        pa.uint64(): pa.int64(),
+    }
+    new_fields = []
+    for field in table.schema:
+        if field.type in uint_to_int:
+            new_fields.append((field.name, uint_to_int[field.type]))
+    if new_fields:
+        print(f"[INFO] Casting {len(new_fields)} unsigned int columns to 
signed")
+        for name, target_type in new_fields:
+            table = table.set_column(
+                table.schema.get_field_index(name),
+                pa.field(name, target_type, 
nullable=table.schema.field(name).nullable),
+                table.column(name).cast(target_type),
+            )
+
+    print(f"[INFO] Loaded {table.num_rows:,} rows, {table.num_columns} 
columns")
+    print(f"[INFO] In-memory size: {table.nbytes / 1024 / 1024:.1f} MB")
+    return table
+
+
+def get_dir_size(path: str) -> int:
+    """Get total size of all files under a directory."""
+    total = 0
+    for dirpath, _, filenames in os.walk(path):
+        for f in filenames:
+            fp = os.path.join(dirpath, f)
+            if os.path.isfile(fp):
+                total += os.path.getsize(fp)
+    return total
+
+
+def write_paimon_table(catalog, table_name: str, pa_schema: pa.Schema,
+                       data: pa.Table, file_format: str) -> dict:
+    """Write data to a Paimon table and return metrics."""
+    schema = Schema.from_pyarrow_schema(pa_schema, options={
+        'file.format': file_format,
+        'data-evolution.enabled': 'true',
+        'row-tracking.enabled': 'true',
+    })
+    catalog.create_table(f'default.{table_name}', schema, False)
+    table = catalog.get_table(f'default.{table_name}')
+
+    write_builder = table.new_batch_write_builder()
+    table_write = write_builder.new_write()
+    table_commit = write_builder.new_commit()
+
+    t0 = time.time()
+    table_write.write_arrow(data)
+    table_commit.commit(table_write.prepare_commit())
+    write_time = time.time() - t0
+
+    table_write.close()
+    table_commit.close()
+
+    return {
+        'write_time': write_time,
+    }
+
+
+def read_paimon_table(catalog, table_name: str) -> dict:
+    """Read all data from a Paimon table and return metrics."""
+    table = catalog.get_table(f'default.{table_name}')
+    read_builder = table.new_read_builder()
+    table_scan = read_builder.new_scan()
+    table_read = read_builder.new_read()
+
+    t0 = time.time()
+    result = table_read.to_arrow(table_scan.plan().splits())
+    read_time = time.time() - t0
+
+    return {
+        'read_time': read_time,
+        'num_rows': result.num_rows,
+    }
+
+
+def point_lookup_paimon_table(catalog, table_name: str, num_lookups: int, 
rows_per_lookup: int) -> dict:
+    """Run row-ID-based point lookups on a data-evolution Paimon table."""
+    table = catalog.get_table(f'default.{table_name}')
+    read_builder = table.new_read_builder()
+    scan = read_builder.new_scan()
+    splits = scan.plan().splits()
+
+    # Determine total row range from splits
+    total_rows = sum(s.row_count for s in splits)
+
+    # Generate random row ID ranges for lookups
+    random.seed(42)
+    lookup_ranges = []
+    for _ in range(num_lookups):
+        start = random.randint(0, total_rows - rows_per_lookup)
+        lookup_ranges.append(Range(start, start + rows_per_lookup - 1))
+
+    total_result_rows = 0
+    t0 = time.time()
+    for rng in lookup_ranges:
+        indexed_splits = []
+        for s in splits:
+            # Intersect the lookup range with each split's file ranges
+            file_ranges = []
+            for f in s.files:
+                if f.first_row_id is not None:
+                    file_ranges.append(Range(f.first_row_id, f.first_row_id + 
f.row_count - 1))
+            split_range = Range.and_([rng], file_ranges)
+            if split_range:
+                indexed_splits.append(IndexedSplit(s, [rng]))
+
+        if indexed_splits:
+            read = read_builder.new_read()
+            result = read.to_arrow(indexed_splits)
+            total_result_rows += result.num_rows
+
+    lookup_time = time.time() - t0
+
+    return {
+        'lookup_time': lookup_time,
+        'num_lookups': num_lookups,
+        'total_rows': total_result_rows,
+    }
+
+
+def predicate_lookup_paimon_table(catalog, table_name: str, num_lookups: int, 
rows_per_lookup: int) -> dict:
+    """Run predicate-based point lookups using _ROW_ID filter on a 
data-evolution Paimon table."""
+    table = catalog.get_table(f'default.{table_name}')
+
+    # Get total rows to generate random ranges
+    rb = table.new_read_builder()
+    splits = rb.new_scan().plan().splits()
+    total_rows = sum(s.row_count for s in splits)
+
+    # Build predicate builder with _ROW_ID in projection
+    all_cols = [f.name for f in table.fields] + ['_ROW_ID']
+    pb = 
table.new_read_builder().with_projection(all_cols).new_predicate_builder()
+
+    random.seed(42)
+    total_result_rows = 0
+    t0 = time.time()
+    for _ in range(num_lookups):
+        start = random.randint(0, total_rows - rows_per_lookup)
+        end = start + rows_per_lookup - 1
+        pred = pb.between('_ROW_ID', start, end)
+        filtered_rb = table.new_read_builder().with_filter(pred)
+        scan = filtered_rb.new_scan()
+        read = filtered_rb.new_read()
+        result = read.to_arrow(scan.plan().splits())
+        total_result_rows += result.num_rows
+
+    lookup_time = time.time() - t0
+
+    return {
+        'lookup_time': lookup_time,
+        'num_lookups': num_lookups,
+        'total_rows': total_result_rows,
+    }
+
+
+def run_benchmark(data: pa.Table, warehouse_dir: str):
+    """Run the compression benchmark across all formats."""
+    catalog = CatalogFactory.create({'warehouse': warehouse_dir})
+    catalog.create_database('default', True)
+
+    pa_schema = data.schema
+    in_memory_mb = data.nbytes / 1024 / 1024
+    results = {}
+
+    num_lookups = 20
+    rows_per_lookup = 100
+
+    for fmt in FORMATS:
+        table_name = f"clickbench_{fmt}"
+        print(f"\n{'='*60}")
+        print(f"  Format: {fmt.upper()}")
+        print(f"{'='*60}")
+
+        # Write
+        print(f"  Writing {data.num_rows:,} rows ...")
+        write_metrics = write_paimon_table(catalog, table_name, pa_schema, 
data, fmt)
+        print(f"  Write time: {write_metrics['write_time']:.2f}s")
+
+        # Measure on-disk size
+        table = catalog.get_table(f'default.{table_name}')
+        table_path = table.table_path
+        disk_size = get_dir_size(table_path)
+        disk_mb = disk_size / 1024 / 1024
+        ratio = in_memory_mb / disk_mb if disk_mb > 0 else 0
+        print(f"  On-disk size: {disk_mb:.1f} MB  (ratio: {ratio:.2f}x)")
+
+        # Full read
+        print("  Reading back ...")
+        read_metrics = read_paimon_table(catalog, table_name)
+        print(f"  Read time: {read_metrics['read_time']:.2f}s")
+        print(f"  Rows read: {read_metrics['num_rows']:,}")
+
+        # Point lookups by row ID
+        print(f"  Point lookups ({num_lookups} queries, {rows_per_lookup} rows 
each) ...")
+        lookup_metrics = point_lookup_paimon_table(catalog, table_name, 
num_lookups, rows_per_lookup)
+        print(f"  Lookup time: {lookup_metrics['lookup_time']:.2f}s")
+        print(f"  Total rows matched: {lookup_metrics['total_rows']:,}")
+
+        # Predicate-based point lookups by _ROW_ID
+        print(f"  Predicate lookups ({num_lookups} queries, {rows_per_lookup} 
rows each) ...")
+        pred_metrics = predicate_lookup_paimon_table(catalog, table_name, 
num_lookups, rows_per_lookup)
+        print(f"  Predicate lookup time: {pred_metrics['lookup_time']:.2f}s")
+        print(f"  Total rows matched: {pred_metrics['total_rows']:,}")
+
+        results[fmt] = {
+            'write_time': write_metrics['write_time'],
+            'read_time': read_metrics['read_time'],
+            'disk_mb': disk_mb,
+            'ratio': ratio,
+            'lookup_time': lookup_metrics['lookup_time'],
+            'lookup_rows': lookup_metrics['total_rows'],
+            'pred_lookup_time': pred_metrics['lookup_time'],
+            'pred_lookup_rows': pred_metrics['total_rows'],
+        }
+
+    return results
+
+
+def print_summary(results: dict, in_memory_mb: float, num_rows: int):
+    """Print a summary comparison table."""
+    print(f"\n{'='*80}")
+    print("  CLICKBENCH COMPRESSION BENCHMARK SUMMARY")
+    print(f"  Rows: {num_rows:,}  |  In-memory: {in_memory_mb:.1f} MB")
+    print(f"{'='*80}")
+    print(f"  {'Format':<10} {'Disk (MB)':>10} {'Ratio':>8} {'Write (s)':>10} "
+          f"{'Read (s)':>10} {'Lookup (s)':>11} {'Pred (s)':>10}")
+    print(f"  {'-'*69}")
+
+    for fmt in FORMATS:
+        if fmt in results:
+            r = results[fmt]
+            print(f"  {fmt:<10} {r['disk_mb']:>10.1f} {r['ratio']:>7.2f}x 
{r['write_time']:>10.2f} "
+                  f"{r['read_time']:>10.2f} {r['lookup_time']:>11.2f} 
{r['pred_lookup_time']:>10.2f}")
+
+    # Vortex vs Parquet comparison
+    if 'vortex' in results and 'parquet' in results:
+        v = results['vortex']
+        p = results['parquet']
+        print("\n  Vortex vs Parquet:")
+        print(f"    Size:   {v['disk_mb'] / p['disk_mb'] * 100:.1f}% of 
Parquet")
+        print(f"    Write:  {v['write_time'] / p['write_time']:.2f}x")
+        print(f"    Read:   {v['read_time'] / p['read_time']:.2f}x")
+        print(f"    Lookup: {v['lookup_time'] / p['lookup_time']:.2f}x")
+        print(f"    Pred:   {v['pred_lookup_time'] / 
p['pred_lookup_time']:.2f}x")
+
+
+def main():
+    parser = argparse.ArgumentParser(description="Clickbench compression 
benchmark for Paimon")
+    parser.add_argument("--rows", type=int, default=3_000_000,
+                        help="Number of rows to use (0 = full dataset, 
default: 3000000)")
+    parser.add_argument("--data-path", type=str, default=None,
+                        help="Path to existing hits.parquet (skips download)")
+    args = parser.parse_args()
+
+    # Resolve data path
+    if args.data_path:
+        data_path = args.data_path
+    else:
+        cache_dir = os.path.join(Path.home(), ".cache", "paimon-bench")
+        os.makedirs(cache_dir, exist_ok=True)
+        data_path = os.path.join(cache_dir, "hits.parquet")
+        download_clickbench(data_path)
+
+    data = load_data(data_path, args.rows)
+    in_memory_mb = data.nbytes / 1024 / 1024
+
+    warehouse_dir = tempfile.mkdtemp(prefix="paimon_bench_")
+    print(f"[INFO] Warehouse: {warehouse_dir}")
+
+    try:
+        results = run_benchmark(data, warehouse_dir)
+        print_summary(results, in_memory_mb, data.num_rows)
+    finally:
+        shutil.rmtree(warehouse_dir, ignore_errors=True)
+
+
+if __name__ == "__main__":
+    main()
diff --git a/paimon-python/pypaimon/read/reader/format_vortex_reader.py 
b/paimon-python/pypaimon/read/reader/format_vortex_reader.py
index d0bf709e77..fd8f5d8dab 100644
--- a/paimon-python/pypaimon/read/reader/format_vortex_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_vortex_reader.py
@@ -16,14 +16,15 @@
 # limitations under the License.
 
################################################################################
 
-from typing import List, Optional, Any
+from typing import List, Optional, Any, Set
 
 import pyarrow as pa
-import pyarrow.dataset as ds
 from pyarrow import RecordBatch
 
 from pypaimon.common.file_io import FileIO
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.schema.data_types import DataField, PyarrowFieldParser
+from pypaimon.table.special_fields import SpecialFields
 
 
 class FormatVortexReader(RecordBatchReader):
@@ -32,8 +33,10 @@ class FormatVortexReader(RecordBatchReader):
     and filters it based on the provided predicate and projection.
     """
 
-    def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
-                 push_down_predicate: Any, batch_size: int = 1024):
+    def __init__(self, file_io: FileIO, file_path: str, read_fields: 
List[DataField],
+                 push_down_predicate: Any, batch_size: int = 1024,
+                 row_indices: Optional[Any] = None,
+                 predicate_fields: Optional[Set[str]] = None):
         import vortex
 
         from pypaimon.read.reader.vortex_utils import to_vortex_specified
@@ -46,45 +49,104 @@ class FormatVortexReader(RecordBatchReader):
         else:
             vortex_file = vortex.open(file_path_for_vortex)
 
-        columns_for_vortex = read_fields if read_fields else None
-        pa_table = vortex_file.to_arrow(columns_for_vortex).read_all()
+        self.read_fields = read_fields
+        self._read_field_names = [f.name for f in read_fields]
 
-        # Vortex exports string_view which some PyArrow kernels don't support 
yet.
-        pa_table = self._cast_string_view_columns(pa_table)
+        # Identify which fields exist in the file and which are missing
+        file_schema_names = set(vortex_file.dtype.to_arrow_schema().names)
+        self.existing_fields = [f.name for f in read_fields if f.name in 
file_schema_names]
+        self.missing_fields = [f.name for f in read_fields if f.name not in 
file_schema_names]
 
+        columns_for_vortex = self.existing_fields if self.existing_fields else 
None
+
+        # Try to convert Arrow predicate to Vortex expr for native push-down
+        vortex_expr = None
         if push_down_predicate is not None:
-            in_memory_dataset = ds.InMemoryDataset(pa_table)
-            scanner = in_memory_dataset.scanner(filter=push_down_predicate, 
batch_size=batch_size)
-            self.reader = scanner.to_reader()
-        else:
-            self.reader = iter(pa_table.to_batches(max_chunksize=batch_size))
+            try:
+                from vortex.arrow.expression import arrow_to_vortex
+                arrow_schema = vortex_file.dtype.to_arrow_schema()
+                vortex_expr = arrow_to_vortex(push_down_predicate, 
arrow_schema)
+            except Exception:
+                pass
+
+        indices = None
+        if row_indices is not None:
+            indices = vortex.array(row_indices)
+
+        self.record_batch_reader = vortex_file.scan(
+            columns_for_vortex, expr=vortex_expr, indices=indices, 
batch_size=batch_size).to_arrow()
+
+        self._output_schema = (
+            PyarrowFieldParser.from_paimon_schema(read_fields) if read_fields 
else None
+        )
+
+        # Collect predicate-referenced fields for targeted view type casting
+        self._cast_fields = predicate_fields if predicate_fields and 
vortex_expr is not None else set()
 
     @staticmethod
-    def _cast_string_view_columns(table: pa.Table) -> pa.Table:
-        new_fields = []
-        needs_cast = False
-        for field in table.schema:
-            if field.type == pa.string_view():
-                new_fields.append(field.with_type(pa.utf8()))
-                needs_cast = True
-            elif field.type == pa.binary_view():
-                new_fields.append(field.with_type(pa.binary()))
-                needs_cast = True
-            else:
-                new_fields.append(field)
-        if not needs_cast:
-            return table
-        return table.cast(pa.schema(new_fields))
+    def _cast_view_types(batch: RecordBatch, target_fields: Set[str]) -> 
RecordBatch:
+        """Cast string_view/binary_view columns to string/binary, only for 
target fields."""
+        if not target_fields:
+            return batch
+        columns = []
+        fields = []
+        changed = False
+        for i in range(batch.num_columns):
+            col = batch.column(i)
+            field = batch.schema.field(i)
+            if field.name in target_fields:
+                if col.type == pa.string_view():
+                    col = col.cast(pa.utf8())
+                    field = field.with_type(pa.utf8())
+                    changed = True
+                elif col.type == pa.binary_view():
+                    col = col.cast(pa.binary())
+                    field = field.with_type(pa.binary())
+                    changed = True
+            columns.append(col)
+            fields.append(field)
+        if changed:
+            return pa.RecordBatch.from_arrays(columns, 
schema=pa.schema(fields))
+        return batch
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
         try:
-            if hasattr(self.reader, 'read_next_batch'):
-                return self.reader.read_next_batch()
-            else:
-                return next(self.reader)
+            batch = next(self.record_batch_reader)
+            batch = self._cast_view_types(batch, self._cast_fields)
+
+            if not self.missing_fields:
+                return batch
+
+            def _type_for_missing(name: str) -> pa.DataType:
+                if self._output_schema is not None:
+                    idx = self._output_schema.get_field_index(name)
+                    if idx >= 0:
+                        return self._output_schema.field(idx).type
+                return pa.null()
+
+            missing_columns = [
+                pa.nulls(batch.num_rows, type=_type_for_missing(name))
+                for name in self.missing_fields
+            ]
+
+            # Reconstruct the batch with all fields in the correct order
+            all_columns = []
+            out_fields = []
+            for field_name in self._read_field_names:
+                if field_name in self.existing_fields:
+                    column_idx = self.existing_fields.index(field_name)
+                    all_columns.append(batch.column(column_idx))
+                    out_fields.append(batch.schema.field(column_idx))
+                else:
+                    column_idx = self.missing_fields.index(field_name)
+                    col_type = _type_for_missing(field_name)
+                    all_columns.append(missing_columns[column_idx])
+                    nullable = not SpecialFields.is_system_field(field_name)
+                    out_fields.append(pa.field(field_name, col_type, 
nullable=nullable))
+            return pa.RecordBatch.from_arrays(all_columns, 
schema=pa.schema(out_fields))
+
         except StopIteration:
             return None
 
     def close(self):
-        if self.reader is not None:
-            self.reader = None
+        self.record_batch_reader = None
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index ea53c3cf96..920ff42317 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -129,7 +129,8 @@ class SplitRead(ABC):
         """Create a record reader for the given split."""
 
     def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
-                             read_fields: List[str], row_tracking_enabled: 
bool) -> RecordBatchReader:
+                             read_fields: List[str], row_tracking_enabled: 
bool,
+                             row_ranges=None) -> RecordBatchReader:
         (read_file_fields, read_arrow_predicate) = 
self._get_fields_and_predicate(file.schema_id, read_fields)
 
         # Use external_path if available, otherwise use file_path
@@ -138,6 +139,20 @@ class SplitRead(ABC):
 
         batch_size = self.table.options.read_batch_size()
 
+        # Compute effective row ranges and Vortex row_indices from row_ranges
+        row_indices = None
+        if row_ranges is not None:
+            effective_row_ranges = Range.and_(row_ranges, 
[file.row_id_range()])
+            if len(effective_row_ranges) == 0:
+                return EmptyRecordBatchReader()
+            if file_format == CoreOptions.FILE_FORMAT_VORTEX:
+                # Convert global row ranges to local indices for Vortex 
pushdown
+                row_indices = []
+                for r in effective_row_ranges:
+                    start = r.from_ - file.first_row_id
+                    end = r.to - file.first_row_id
+                    row_indices.extend(range(start, end + 1))
+
         format_reader: RecordBatchReader
         if file_format == CoreOptions.FILE_FORMAT_AVRO:
             format_reader = FormatAvroReader(self.table.file_io, file_path, 
read_file_fields,
@@ -151,8 +166,13 @@ class SplitRead(ABC):
             format_reader = FormatLanceReader(self.table.file_io, file_path, 
read_file_fields,
                                               read_arrow_predicate, 
batch_size=batch_size)
         elif file_format == CoreOptions.FILE_FORMAT_VORTEX:
-            format_reader = FormatVortexReader(self.table.file_io, file_path, 
read_file_fields,
-                                               read_arrow_predicate, 
batch_size=batch_size)
+            name_to_field = {f.name: f for f in self.read_fields}
+            ordered_read_fields = [name_to_field[n] for n in read_file_fields 
if n in name_to_field]
+            predicate_fields = _get_all_fields(self.push_down_predicate) if 
self.push_down_predicate else set()
+            format_reader = FormatVortexReader(self.table.file_io, file_path, 
ordered_read_fields,
+                                               read_arrow_predicate, 
batch_size=batch_size,
+                                               row_indices=row_indices,
+                                               
predicate_fields=predicate_fields)
         elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == 
CoreOptions.FILE_FORMAT_ORC:
             name_to_field = {f.name: f for f in self.read_fields}
             ordered_read_fields = [name_to_field[n] for n in read_file_fields 
if n in name_to_field]
@@ -177,7 +197,7 @@ class SplitRead(ABC):
             if row_tracking_enabled else self.table.table_schema.fields
         )
         if for_merge_read:
-            return DataFileBatchReader(
+            reader = DataFileBatchReader(
                 format_reader,
                 index_mapping,
                 partition_info,
@@ -191,7 +211,7 @@ class SplitRead(ABC):
                 blob_descriptor_fields=blob_descriptor_fields,
                 file_io=self.table.file_io)
         else:
-            return DataFileBatchReader(
+            reader = DataFileBatchReader(
                 format_reader,
                 index_mapping,
                 partition_info,
@@ -205,6 +225,12 @@ class SplitRead(ABC):
                 blob_descriptor_fields=blob_descriptor_fields,
                 file_io=self.table.file_io)
 
+        # For non-Vortex formats, wrap with RowIdFilterRecordBatchReader
+        if row_ranges is not None and row_indices is None:
+            reader = RowIdFilterRecordBatchReader(reader, file.first_row_id, 
effective_row_ranges)
+
+        return reader
+
     def _get_fields_and_predicate(self, schema_id: int, read_fields):
         key = (schema_id, tuple(read_fields))
         if key not in self.schema_id_2_fields:
@@ -669,18 +695,12 @@ class DataEvolutionSplitRead(SplitRead):
 
     def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> 
Optional[RecordReader]:
         """Create a file reader for a single file."""
-        def create_record_reader():
-            return self.file_reader_supplier(
-                file=file,
-                for_merge_read=False,
-                read_fields=read_fields,
-                row_tracking_enabled=True)
-        if self.row_ranges is None:
-            return create_record_reader()
-        row_ranges = Range.and_(self.row_ranges, [file.row_id_range()])
-        if len(row_ranges) == 0:
-            return EmptyRecordBatchReader()
-        return RowIdFilterRecordBatchReader(create_record_reader(), 
file.first_row_id, row_ranges)
+        return self.file_reader_supplier(
+            file=file,
+            for_merge_read=False,
+            read_fields=read_fields,
+            row_tracking_enabled=True,
+            row_ranges=self.row_ranges)
 
     def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> 
List[FieldBunch]:
         """Split files into field bunches."""
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py 
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 266041eec5..6da142fb75 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -17,6 +17,7 @@ limitations under the License.
 """
 import os
 import shutil
+import sys
 import tempfile
 import unittest
 from types import SimpleNamespace
@@ -1437,3 +1438,158 @@ class DataEvolutionTest(unittest.TestCase):
         ]))
         self.assertEqual(actual.num_rows, 2)
         self.assertEqual(actual, expect)
+
+    @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
+    def test_vortex_basic(self):
+        """Test basic data evolution read/write with Vortex format."""
+        pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+            ('f2', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'file.format': 'vortex',
+            }
+        )
+        self.catalog.create_table('default.test_vortex_basic', schema, False)
+        table = self.catalog.get_table('default.test_vortex_basic')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Commit 1: write f0, f1
+        w0 = write_builder.new_write().with_write_type(['f0', 'f1'])
+        w1 = write_builder.new_write().with_write_type(['f2'])
+        c = write_builder.new_commit()
+        d0 = pa.Table.from_pydict(
+            {'f0': [1, 2, 3], 'f1': ['a', 'b', 'c']},
+            schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())]))
+        d1 = pa.Table.from_pydict(
+            {'f2': ['x', 'y', 'z']},
+            schema=pa.schema([('f2', pa.string())]))
+        w0.write_arrow(d0)
+        w1.write_arrow(d1)
+        cmts = w0.prepare_commit() + w1.prepare_commit()
+        for msg in cmts:
+            for nf in msg.new_files:
+                nf.first_row_id = 0
+        c.commit(cmts)
+        w0.close()
+        w1.close()
+        c.close()
+
+        read_builder = table.new_read_builder()
+        actual = 
read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits())
+        expected = pa.Table.from_pydict({
+            'f0': [1, 2, 3],
+            'f1': ['a', 'b', 'c'],
+            'f2': ['x', 'y', 'z'],
+        }, schema=pa_schema)
+        self.assertEqual(actual, expected)
+
+    @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
+    def test_vortex_row_id_filter(self):
+        """Test that Vortex row_indices pushdown works via 
file_reader_supplier row_ranges."""
+        pa_schema = pa.schema([
+            ('f0', pa.int32()),
+            ('f1', pa.string()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'file.format': 'vortex',
+            }
+        )
+        self.catalog.create_table('default.test_vortex_row_id_filter', schema, 
False)
+        table = self.catalog.get_table('default.test_vortex_row_id_filter')
+
+        write_builder = table.new_batch_write_builder()
+
+        # Commit 1: rows 0-4
+        w = write_builder.new_write()
+        c = write_builder.new_commit()
+        w.write_arrow(pa.Table.from_pydict(
+            {'f0': list(range(5)), 'f1': [f'v{i}' for i in range(5)]},
+            schema=pa_schema))
+        c.commit(w.prepare_commit())
+        w.close()
+        c.close()
+
+        # Commit 2: rows 5-9
+        w = write_builder.new_write()
+        c = write_builder.new_commit()
+        w.write_arrow(pa.Table.from_pydict(
+            {'f0': list(range(5, 10)), 'f1': [f'v{i}' for i in range(5, 10)]},
+            schema=pa_schema))
+        c.commit(w.prepare_commit())
+        w.close()
+        c.close()
+
+        # Full read
+        rb = table.new_read_builder()
+        full = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(full.num_rows, 10)
+
+        # Filter by _ROW_ID using predicate — triggers row_ranges pushdown in 
Vortex
+        rb_with_rowid = table.new_read_builder().with_projection(['f0', 'f1', 
'_ROW_ID'])
+        pb = rb_with_rowid.new_predicate_builder()
+        rb_filtered = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 
3))
+        filtered = 
rb_filtered.new_read().to_arrow(rb_filtered.new_scan().plan().splits())
+        self.assertEqual(filtered.num_rows, 1)
+        self.assertEqual(filtered.column('f0')[0].as_py(), 3)
+        self.assertEqual(filtered.column('f1')[0].as_py(), 'v3')
+
+        # Filter by _ROW_ID range spanning two files
+        rb_range = table.new_read_builder().with_filter(pb.between('_ROW_ID', 
3, 6))
+        range_result = 
rb_range.new_read().to_arrow(rb_range.new_scan().plan().splits())
+        self.assertEqual(range_result.num_rows, 4)
+        self.assertEqual(sorted(range_result.column('f0').to_pylist()), [3, 4, 
5, 6])
+
+    @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python 
>= 3.11")
+    def test_vortex_with_slice(self):
+        """Test with_slice on Vortex data evolution table."""
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('val', pa.int32()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={
+                'row-tracking.enabled': 'true',
+                'data-evolution.enabled': 'true',
+                'file.format': 'vortex',
+                'source.split.target-size': '512m',
+            }
+        )
+        self.catalog.create_table('default.test_vortex_with_slice', schema, 
False)
+        table = self.catalog.get_table('default.test_vortex_with_slice')
+
+        for batch in [
+            {'id': [1, 2], 'val': [10, 20]},
+            {'id': [3, 4], 'val': [30, 40]},
+            {'id': [5, 6], 'val': [50, 60]},
+        ]:
+            wb = table.new_batch_write_builder()
+            tw = wb.new_write()
+            tc = wb.new_commit()
+            tw.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema))
+            tc.commit(tw.prepare_commit())
+            tw.close()
+            tc.close()
+
+        rb = table.new_read_builder()
+
+        # Full read
+        full = rb.new_read().to_arrow(rb.new_scan().plan().splits())
+        self.assertEqual(full.num_rows, 6)
+
+        # with_slice(1, 4) -> rows at index 1,2,3 -> id in (2,3,4)
+        scan = rb.new_scan().with_slice(1, 4)
+        sliced = rb.new_read().to_arrow(scan.plan().splits())
+        self.assertEqual(sliced.num_rows, 3)
+        self.assertEqual(sorted(sliced.column('id').to_pylist()), [2, 3, 4])
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py 
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index 54749a9dd8..a8529c14a8 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -125,14 +125,48 @@ class AoReaderTest(unittest.TestCase):
         p1 = predicate_builder.less_than('user_id', 7)
         p2 = predicate_builder.greater_or_equal('user_id', 2)
         p3 = predicate_builder.between('user_id', 0, 6)  # [2/b, 3/c, 4/d, 
5/e, 6/f] left
-        p4 = predicate_builder.is_not_in('behavior', ['b', 'e'])  # [3/c, 4/d, 
6/f] left
-        p5 = predicate_builder.is_in('dt', ['p1'])  # exclude 3/c
-        p6 = predicate_builder.is_not_null('behavior')  # exclude 4/d
-        g1 = predicate_builder.and_predicates([p1, p2, p3, p4, p5, p6])
+        p6 = predicate_builder.is_not_null('behavior')  # exclude 4/d -> [2/b, 
3/c, 5/e, 6/f]
+        g1 = predicate_builder.and_predicates([p1, p2, p3, p6])
         read_builder = table.new_read_builder().with_filter(g1)
         actual = self._read_test_table(read_builder)
         expected = pa.concat_tables([
-            self.expected.slice(5, 1)  # 6/f
+            self.expected.slice(1, 2),  # 2/b, 3/c
+            self.expected.slice(4, 2),  # 5/e, 6/f
+        ])
+        self.assertEqual(actual.sort_by('user_id'), expected)
+
+        # OR predicates with startswith, endswith, contains, equal, is_null
+        p7 = predicate_builder.startswith('behavior', 'a')
+        p10 = predicate_builder.equal('item_id', 1002)
+        p11 = predicate_builder.is_null('behavior')
+        p9 = predicate_builder.contains('behavior', 'f')
+        p8 = predicate_builder.endswith('dt', 'p2')
+        g2 = predicate_builder.or_predicates([p7, p8, p9, p10, p11])
+        read_builder = table.new_read_builder().with_filter(g2)
+        actual = self._read_test_table(read_builder)
+        self.assertEqual(actual.sort_by('user_id'), self.expected)
+
+        # Combined AND + OR
+        g3 = predicate_builder.and_predicates([g1, g2])
+        read_builder = table.new_read_builder().with_filter(g3)
+        actual = self._read_test_table(read_builder)
+        expected = pa.concat_tables([
+            self.expected.slice(1, 2),  # 2/b, 3/c
+            self.expected.slice(4, 2),  # 5/e, 6/f
+        ])
+        self.assertEqual(actual.sort_by('user_id'), expected)
+
+        # not_equal also filters None values
+        p12 = predicate_builder.not_equal('behavior', 'f')
+        read_builder = table.new_read_builder().with_filter(p12)
+        actual = self._read_test_table(read_builder)
+        expected = pa.concat_tables([
+            self.expected.slice(0, 1),  # 1/a
+            self.expected.slice(1, 1),  # 2/b
+            self.expected.slice(2, 1),  # 3/c
+            self.expected.slice(4, 1),  # 5/e
+            self.expected.slice(6, 1),  # 7/g
+            self.expected.slice(7, 1),  # 8/h
         ])
         self.assertEqual(actual.sort_by('user_id'), expected)
 


Reply via email to