This is an automated email from the ASF dual-hosted git repository. hope pushed a commit to branch release-1.4 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 73825eb63184fda93ced18abeb6ac69f2e2afd8f Author: Jingsong Lee <[email protected]> AuthorDate: Tue Mar 31 16:45:53 2026 +0800 [python] Push down row ranges to vortex reader (#7558) Push the row ranges down to the Vortex reader layer and use vortex_file. scan (indices=...) for native row filtering to avoid filtering after full read. There is no problem with the direction, and the performance benefits are significant. --- .../pypaimon/benchmark/clickbench_format.py | 374 +++++++++++++++++++++ .../pypaimon/read/reader/format_vortex_reader.py | 130 +++++-- paimon-python/pypaimon/read/split_read.py | 54 ++- .../pypaimon/tests/data_evolution_test.py | 156 +++++++++ .../pypaimon/tests/reader_append_only_test.py | 44 ++- 5 files changed, 702 insertions(+), 56 deletions(-) diff --git a/paimon-python/pypaimon/benchmark/clickbench_format.py b/paimon-python/pypaimon/benchmark/clickbench_format.py new file mode 100644 index 0000000000..9f4bc5c22c --- /dev/null +++ b/paimon-python/pypaimon/benchmark/clickbench_format.py @@ -0,0 +1,374 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +""" +Clickbench benchmark for Paimon file formats. + +Downloads the Clickbench hits.parquet dataset and compares compression ratios +and read performance across Parquet, ORC, and Vortex formats in Paimon tables. + +Usage: + python benchmarks/clickbench_format.py [--rows N] [--data-path PATH] + + --rows N Number of rows to use (default: 1_000_000, use 0 for full dataset) + --data-path PATH Path to existing hits.parquet (skips download if provided) +""" + +import argparse +import os +import random +import shutil +import sys +import tempfile +import time +from pathlib import Path + +import pyarrow as pa +import pyarrow.parquet as pq + +# Add project root to path +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + +from pypaimon import CatalogFactory, Schema +from pypaimon.globalindex.indexed_split import IndexedSplit +from pypaimon.utils.range import Range + +CLICKBENCH_URL = "https://datasets.clickhouse.com/hits_compatible/hits.parquet" + +# Formats to benchmark +FORMATS = ["parquet", "orc", "lance", "vortex"] + + +def download_clickbench(dest_path: str): + """Download Clickbench hits.parquet if not already present.""" + if os.path.exists(dest_path): + print(f"[INFO] Using cached dataset: {dest_path}") + return + print(f"[INFO] Downloading Clickbench dataset to {dest_path} ...") + print(f"[INFO] URL: {CLICKBENCH_URL}") + print("[INFO] This is ~14GB, may take a while.") + + import urllib.request + urllib.request.urlretrieve(CLICKBENCH_URL, dest_path) + print("[INFO] Download complete.") + + +def load_data(data_path: str, max_rows: int) -> pa.Table: + """Load Clickbench parquet data, optionally limiting rows.""" + print(f"[INFO] Loading data from {data_path} ...") + pf = pq.ParquetFile(data_path) + if max_rows > 0: + # Read enough row groups to satisfy max_rows + batches = [] + total = 0 + for rg_idx in range(pf.metadata.num_row_groups): + rg = pf.read_row_group(rg_idx) + batches.append(rg) + total += rg.num_rows + if total >= max_rows: + break + table = pa.concat_tables(batches) + if table.num_rows > max_rows: + table = table.slice(0, max_rows) + else: + table = pf.read() + + # Cast unsigned integer types to signed (Paimon doesn't support unsigned) + uint_to_int = { + pa.uint8(): pa.int16(), + pa.uint16(): pa.int32(), + pa.uint32(): pa.int64(), + pa.uint64(): pa.int64(), + } + new_fields = [] + for field in table.schema: + if field.type in uint_to_int: + new_fields.append((field.name, uint_to_int[field.type])) + if new_fields: + print(f"[INFO] Casting {len(new_fields)} unsigned int columns to signed") + for name, target_type in new_fields: + table = table.set_column( + table.schema.get_field_index(name), + pa.field(name, target_type, nullable=table.schema.field(name).nullable), + table.column(name).cast(target_type), + ) + + print(f"[INFO] Loaded {table.num_rows:,} rows, {table.num_columns} columns") + print(f"[INFO] In-memory size: {table.nbytes / 1024 / 1024:.1f} MB") + return table + + +def get_dir_size(path: str) -> int: + """Get total size of all files under a directory.""" + total = 0 + for dirpath, _, filenames in os.walk(path): + for f in filenames: + fp = os.path.join(dirpath, f) + if os.path.isfile(fp): + total += os.path.getsize(fp) + return total + + +def write_paimon_table(catalog, table_name: str, pa_schema: pa.Schema, + data: pa.Table, file_format: str) -> dict: + """Write data to a Paimon table and return metrics.""" + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'file.format': file_format, + 'data-evolution.enabled': 'true', + 'row-tracking.enabled': 'true', + }) + catalog.create_table(f'default.{table_name}', schema, False) + table = catalog.get_table(f'default.{table_name}') + + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + + t0 = time.time() + table_write.write_arrow(data) + table_commit.commit(table_write.prepare_commit()) + write_time = time.time() - t0 + + table_write.close() + table_commit.close() + + return { + 'write_time': write_time, + } + + +def read_paimon_table(catalog, table_name: str) -> dict: + """Read all data from a Paimon table and return metrics.""" + table = catalog.get_table(f'default.{table_name}') + read_builder = table.new_read_builder() + table_scan = read_builder.new_scan() + table_read = read_builder.new_read() + + t0 = time.time() + result = table_read.to_arrow(table_scan.plan().splits()) + read_time = time.time() - t0 + + return { + 'read_time': read_time, + 'num_rows': result.num_rows, + } + + +def point_lookup_paimon_table(catalog, table_name: str, num_lookups: int, rows_per_lookup: int) -> dict: + """Run row-ID-based point lookups on a data-evolution Paimon table.""" + table = catalog.get_table(f'default.{table_name}') + read_builder = table.new_read_builder() + scan = read_builder.new_scan() + splits = scan.plan().splits() + + # Determine total row range from splits + total_rows = sum(s.row_count for s in splits) + + # Generate random row ID ranges for lookups + random.seed(42) + lookup_ranges = [] + for _ in range(num_lookups): + start = random.randint(0, total_rows - rows_per_lookup) + lookup_ranges.append(Range(start, start + rows_per_lookup - 1)) + + total_result_rows = 0 + t0 = time.time() + for rng in lookup_ranges: + indexed_splits = [] + for s in splits: + # Intersect the lookup range with each split's file ranges + file_ranges = [] + for f in s.files: + if f.first_row_id is not None: + file_ranges.append(Range(f.first_row_id, f.first_row_id + f.row_count - 1)) + split_range = Range.and_([rng], file_ranges) + if split_range: + indexed_splits.append(IndexedSplit(s, [rng])) + + if indexed_splits: + read = read_builder.new_read() + result = read.to_arrow(indexed_splits) + total_result_rows += result.num_rows + + lookup_time = time.time() - t0 + + return { + 'lookup_time': lookup_time, + 'num_lookups': num_lookups, + 'total_rows': total_result_rows, + } + + +def predicate_lookup_paimon_table(catalog, table_name: str, num_lookups: int, rows_per_lookup: int) -> dict: + """Run predicate-based point lookups using _ROW_ID filter on a data-evolution Paimon table.""" + table = catalog.get_table(f'default.{table_name}') + + # Get total rows to generate random ranges + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + total_rows = sum(s.row_count for s in splits) + + # Build predicate builder with _ROW_ID in projection + all_cols = [f.name for f in table.fields] + ['_ROW_ID'] + pb = table.new_read_builder().with_projection(all_cols).new_predicate_builder() + + random.seed(42) + total_result_rows = 0 + t0 = time.time() + for _ in range(num_lookups): + start = random.randint(0, total_rows - rows_per_lookup) + end = start + rows_per_lookup - 1 + pred = pb.between('_ROW_ID', start, end) + filtered_rb = table.new_read_builder().with_filter(pred) + scan = filtered_rb.new_scan() + read = filtered_rb.new_read() + result = read.to_arrow(scan.plan().splits()) + total_result_rows += result.num_rows + + lookup_time = time.time() - t0 + + return { + 'lookup_time': lookup_time, + 'num_lookups': num_lookups, + 'total_rows': total_result_rows, + } + + +def run_benchmark(data: pa.Table, warehouse_dir: str): + """Run the compression benchmark across all formats.""" + catalog = CatalogFactory.create({'warehouse': warehouse_dir}) + catalog.create_database('default', True) + + pa_schema = data.schema + in_memory_mb = data.nbytes / 1024 / 1024 + results = {} + + num_lookups = 20 + rows_per_lookup = 100 + + for fmt in FORMATS: + table_name = f"clickbench_{fmt}" + print(f"\n{'='*60}") + print(f" Format: {fmt.upper()}") + print(f"{'='*60}") + + # Write + print(f" Writing {data.num_rows:,} rows ...") + write_metrics = write_paimon_table(catalog, table_name, pa_schema, data, fmt) + print(f" Write time: {write_metrics['write_time']:.2f}s") + + # Measure on-disk size + table = catalog.get_table(f'default.{table_name}') + table_path = table.table_path + disk_size = get_dir_size(table_path) + disk_mb = disk_size / 1024 / 1024 + ratio = in_memory_mb / disk_mb if disk_mb > 0 else 0 + print(f" On-disk size: {disk_mb:.1f} MB (ratio: {ratio:.2f}x)") + + # Full read + print(" Reading back ...") + read_metrics = read_paimon_table(catalog, table_name) + print(f" Read time: {read_metrics['read_time']:.2f}s") + print(f" Rows read: {read_metrics['num_rows']:,}") + + # Point lookups by row ID + print(f" Point lookups ({num_lookups} queries, {rows_per_lookup} rows each) ...") + lookup_metrics = point_lookup_paimon_table(catalog, table_name, num_lookups, rows_per_lookup) + print(f" Lookup time: {lookup_metrics['lookup_time']:.2f}s") + print(f" Total rows matched: {lookup_metrics['total_rows']:,}") + + # Predicate-based point lookups by _ROW_ID + print(f" Predicate lookups ({num_lookups} queries, {rows_per_lookup} rows each) ...") + pred_metrics = predicate_lookup_paimon_table(catalog, table_name, num_lookups, rows_per_lookup) + print(f" Predicate lookup time: {pred_metrics['lookup_time']:.2f}s") + print(f" Total rows matched: {pred_metrics['total_rows']:,}") + + results[fmt] = { + 'write_time': write_metrics['write_time'], + 'read_time': read_metrics['read_time'], + 'disk_mb': disk_mb, + 'ratio': ratio, + 'lookup_time': lookup_metrics['lookup_time'], + 'lookup_rows': lookup_metrics['total_rows'], + 'pred_lookup_time': pred_metrics['lookup_time'], + 'pred_lookup_rows': pred_metrics['total_rows'], + } + + return results + + +def print_summary(results: dict, in_memory_mb: float, num_rows: int): + """Print a summary comparison table.""" + print(f"\n{'='*80}") + print(" CLICKBENCH COMPRESSION BENCHMARK SUMMARY") + print(f" Rows: {num_rows:,} | In-memory: {in_memory_mb:.1f} MB") + print(f"{'='*80}") + print(f" {'Format':<10} {'Disk (MB)':>10} {'Ratio':>8} {'Write (s)':>10} " + f"{'Read (s)':>10} {'Lookup (s)':>11} {'Pred (s)':>10}") + print(f" {'-'*69}") + + for fmt in FORMATS: + if fmt in results: + r = results[fmt] + print(f" {fmt:<10} {r['disk_mb']:>10.1f} {r['ratio']:>7.2f}x {r['write_time']:>10.2f} " + f"{r['read_time']:>10.2f} {r['lookup_time']:>11.2f} {r['pred_lookup_time']:>10.2f}") + + # Vortex vs Parquet comparison + if 'vortex' in results and 'parquet' in results: + v = results['vortex'] + p = results['parquet'] + print("\n Vortex vs Parquet:") + print(f" Size: {v['disk_mb'] / p['disk_mb'] * 100:.1f}% of Parquet") + print(f" Write: {v['write_time'] / p['write_time']:.2f}x") + print(f" Read: {v['read_time'] / p['read_time']:.2f}x") + print(f" Lookup: {v['lookup_time'] / p['lookup_time']:.2f}x") + print(f" Pred: {v['pred_lookup_time'] / p['pred_lookup_time']:.2f}x") + + +def main(): + parser = argparse.ArgumentParser(description="Clickbench compression benchmark for Paimon") + parser.add_argument("--rows", type=int, default=3_000_000, + help="Number of rows to use (0 = full dataset, default: 3000000)") + parser.add_argument("--data-path", type=str, default=None, + help="Path to existing hits.parquet (skips download)") + args = parser.parse_args() + + # Resolve data path + if args.data_path: + data_path = args.data_path + else: + cache_dir = os.path.join(Path.home(), ".cache", "paimon-bench") + os.makedirs(cache_dir, exist_ok=True) + data_path = os.path.join(cache_dir, "hits.parquet") + download_clickbench(data_path) + + data = load_data(data_path, args.rows) + in_memory_mb = data.nbytes / 1024 / 1024 + + warehouse_dir = tempfile.mkdtemp(prefix="paimon_bench_") + print(f"[INFO] Warehouse: {warehouse_dir}") + + try: + results = run_benchmark(data, warehouse_dir) + print_summary(results, in_memory_mb, data.num_rows) + finally: + shutil.rmtree(warehouse_dir, ignore_errors=True) + + +if __name__ == "__main__": + main() diff --git a/paimon-python/pypaimon/read/reader/format_vortex_reader.py b/paimon-python/pypaimon/read/reader/format_vortex_reader.py index d0bf709e77..fd8f5d8dab 100644 --- a/paimon-python/pypaimon/read/reader/format_vortex_reader.py +++ b/paimon-python/pypaimon/read/reader/format_vortex_reader.py @@ -16,14 +16,15 @@ # limitations under the License. ################################################################################ -from typing import List, Optional, Any +from typing import List, Optional, Any, Set import pyarrow as pa -import pyarrow.dataset as ds from pyarrow import RecordBatch from pypaimon.common.file_io import FileIO from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader +from pypaimon.schema.data_types import DataField, PyarrowFieldParser +from pypaimon.table.special_fields import SpecialFields class FormatVortexReader(RecordBatchReader): @@ -32,8 +33,10 @@ class FormatVortexReader(RecordBatchReader): and filters it based on the provided predicate and projection. """ - def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str], - push_down_predicate: Any, batch_size: int = 1024): + def __init__(self, file_io: FileIO, file_path: str, read_fields: List[DataField], + push_down_predicate: Any, batch_size: int = 1024, + row_indices: Optional[Any] = None, + predicate_fields: Optional[Set[str]] = None): import vortex from pypaimon.read.reader.vortex_utils import to_vortex_specified @@ -46,45 +49,104 @@ class FormatVortexReader(RecordBatchReader): else: vortex_file = vortex.open(file_path_for_vortex) - columns_for_vortex = read_fields if read_fields else None - pa_table = vortex_file.to_arrow(columns_for_vortex).read_all() + self.read_fields = read_fields + self._read_field_names = [f.name for f in read_fields] - # Vortex exports string_view which some PyArrow kernels don't support yet. - pa_table = self._cast_string_view_columns(pa_table) + # Identify which fields exist in the file and which are missing + file_schema_names = set(vortex_file.dtype.to_arrow_schema().names) + self.existing_fields = [f.name for f in read_fields if f.name in file_schema_names] + self.missing_fields = [f.name for f in read_fields if f.name not in file_schema_names] + columns_for_vortex = self.existing_fields if self.existing_fields else None + + # Try to convert Arrow predicate to Vortex expr for native push-down + vortex_expr = None if push_down_predicate is not None: - in_memory_dataset = ds.InMemoryDataset(pa_table) - scanner = in_memory_dataset.scanner(filter=push_down_predicate, batch_size=batch_size) - self.reader = scanner.to_reader() - else: - self.reader = iter(pa_table.to_batches(max_chunksize=batch_size)) + try: + from vortex.arrow.expression import arrow_to_vortex + arrow_schema = vortex_file.dtype.to_arrow_schema() + vortex_expr = arrow_to_vortex(push_down_predicate, arrow_schema) + except Exception: + pass + + indices = None + if row_indices is not None: + indices = vortex.array(row_indices) + + self.record_batch_reader = vortex_file.scan( + columns_for_vortex, expr=vortex_expr, indices=indices, batch_size=batch_size).to_arrow() + + self._output_schema = ( + PyarrowFieldParser.from_paimon_schema(read_fields) if read_fields else None + ) + + # Collect predicate-referenced fields for targeted view type casting + self._cast_fields = predicate_fields if predicate_fields and vortex_expr is not None else set() @staticmethod - def _cast_string_view_columns(table: pa.Table) -> pa.Table: - new_fields = [] - needs_cast = False - for field in table.schema: - if field.type == pa.string_view(): - new_fields.append(field.with_type(pa.utf8())) - needs_cast = True - elif field.type == pa.binary_view(): - new_fields.append(field.with_type(pa.binary())) - needs_cast = True - else: - new_fields.append(field) - if not needs_cast: - return table - return table.cast(pa.schema(new_fields)) + def _cast_view_types(batch: RecordBatch, target_fields: Set[str]) -> RecordBatch: + """Cast string_view/binary_view columns to string/binary, only for target fields.""" + if not target_fields: + return batch + columns = [] + fields = [] + changed = False + for i in range(batch.num_columns): + col = batch.column(i) + field = batch.schema.field(i) + if field.name in target_fields: + if col.type == pa.string_view(): + col = col.cast(pa.utf8()) + field = field.with_type(pa.utf8()) + changed = True + elif col.type == pa.binary_view(): + col = col.cast(pa.binary()) + field = field.with_type(pa.binary()) + changed = True + columns.append(col) + fields.append(field) + if changed: + return pa.RecordBatch.from_arrays(columns, schema=pa.schema(fields)) + return batch def read_arrow_batch(self) -> Optional[RecordBatch]: try: - if hasattr(self.reader, 'read_next_batch'): - return self.reader.read_next_batch() - else: - return next(self.reader) + batch = next(self.record_batch_reader) + batch = self._cast_view_types(batch, self._cast_fields) + + if not self.missing_fields: + return batch + + def _type_for_missing(name: str) -> pa.DataType: + if self._output_schema is not None: + idx = self._output_schema.get_field_index(name) + if idx >= 0: + return self._output_schema.field(idx).type + return pa.null() + + missing_columns = [ + pa.nulls(batch.num_rows, type=_type_for_missing(name)) + for name in self.missing_fields + ] + + # Reconstruct the batch with all fields in the correct order + all_columns = [] + out_fields = [] + for field_name in self._read_field_names: + if field_name in self.existing_fields: + column_idx = self.existing_fields.index(field_name) + all_columns.append(batch.column(column_idx)) + out_fields.append(batch.schema.field(column_idx)) + else: + column_idx = self.missing_fields.index(field_name) + col_type = _type_for_missing(field_name) + all_columns.append(missing_columns[column_idx]) + nullable = not SpecialFields.is_system_field(field_name) + out_fields.append(pa.field(field_name, col_type, nullable=nullable)) + return pa.RecordBatch.from_arrays(all_columns, schema=pa.schema(out_fields)) + except StopIteration: return None def close(self): - if self.reader is not None: - self.reader = None + self.record_batch_reader = None diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index ea53c3cf96..920ff42317 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -129,7 +129,8 @@ class SplitRead(ABC): """Create a record reader for the given split.""" def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, - read_fields: List[str], row_tracking_enabled: bool) -> RecordBatchReader: + read_fields: List[str], row_tracking_enabled: bool, + row_ranges=None) -> RecordBatchReader: (read_file_fields, read_arrow_predicate) = self._get_fields_and_predicate(file.schema_id, read_fields) # Use external_path if available, otherwise use file_path @@ -138,6 +139,20 @@ class SplitRead(ABC): batch_size = self.table.options.read_batch_size() + # Compute effective row ranges and Vortex row_indices from row_ranges + row_indices = None + if row_ranges is not None: + effective_row_ranges = Range.and_(row_ranges, [file.row_id_range()]) + if len(effective_row_ranges) == 0: + return EmptyRecordBatchReader() + if file_format == CoreOptions.FILE_FORMAT_VORTEX: + # Convert global row ranges to local indices for Vortex pushdown + row_indices = [] + for r in effective_row_ranges: + start = r.from_ - file.first_row_id + end = r.to - file.first_row_id + row_indices.extend(range(start, end + 1)) + format_reader: RecordBatchReader if file_format == CoreOptions.FILE_FORMAT_AVRO: format_reader = FormatAvroReader(self.table.file_io, file_path, read_file_fields, @@ -151,8 +166,13 @@ class SplitRead(ABC): format_reader = FormatLanceReader(self.table.file_io, file_path, read_file_fields, read_arrow_predicate, batch_size=batch_size) elif file_format == CoreOptions.FILE_FORMAT_VORTEX: - format_reader = FormatVortexReader(self.table.file_io, file_path, read_file_fields, - read_arrow_predicate, batch_size=batch_size) + name_to_field = {f.name: f for f in self.read_fields} + ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field] + predicate_fields = _get_all_fields(self.push_down_predicate) if self.push_down_predicate else set() + format_reader = FormatVortexReader(self.table.file_io, file_path, ordered_read_fields, + read_arrow_predicate, batch_size=batch_size, + row_indices=row_indices, + predicate_fields=predicate_fields) elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC: name_to_field = {f.name: f for f in self.read_fields} ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field] @@ -177,7 +197,7 @@ class SplitRead(ABC): if row_tracking_enabled else self.table.table_schema.fields ) if for_merge_read: - return DataFileBatchReader( + reader = DataFileBatchReader( format_reader, index_mapping, partition_info, @@ -191,7 +211,7 @@ class SplitRead(ABC): blob_descriptor_fields=blob_descriptor_fields, file_io=self.table.file_io) else: - return DataFileBatchReader( + reader = DataFileBatchReader( format_reader, index_mapping, partition_info, @@ -205,6 +225,12 @@ class SplitRead(ABC): blob_descriptor_fields=blob_descriptor_fields, file_io=self.table.file_io) + # For non-Vortex formats, wrap with RowIdFilterRecordBatchReader + if row_ranges is not None and row_indices is None: + reader = RowIdFilterRecordBatchReader(reader, file.first_row_id, effective_row_ranges) + + return reader + def _get_fields_and_predicate(self, schema_id: int, read_fields): key = (schema_id, tuple(read_fields)) if key not in self.schema_id_2_fields: @@ -669,18 +695,12 @@ class DataEvolutionSplitRead(SplitRead): def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> Optional[RecordReader]: """Create a file reader for a single file.""" - def create_record_reader(): - return self.file_reader_supplier( - file=file, - for_merge_read=False, - read_fields=read_fields, - row_tracking_enabled=True) - if self.row_ranges is None: - return create_record_reader() - row_ranges = Range.and_(self.row_ranges, [file.row_id_range()]) - if len(row_ranges) == 0: - return EmptyRecordBatchReader() - return RowIdFilterRecordBatchReader(create_record_reader(), file.first_row_id, row_ranges) + return self.file_reader_supplier( + file=file, + for_merge_read=False, + read_fields=read_fields, + row_tracking_enabled=True, + row_ranges=self.row_ranges) def _split_field_bunches(self, need_merge_files: List[DataFileMeta]) -> List[FieldBunch]: """Split files into field bunches.""" diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py b/paimon-python/pypaimon/tests/data_evolution_test.py index 266041eec5..6da142fb75 100644 --- a/paimon-python/pypaimon/tests/data_evolution_test.py +++ b/paimon-python/pypaimon/tests/data_evolution_test.py @@ -17,6 +17,7 @@ limitations under the License. """ import os import shutil +import sys import tempfile import unittest from types import SimpleNamespace @@ -1437,3 +1438,158 @@ class DataEvolutionTest(unittest.TestCase): ])) self.assertEqual(actual.num_rows, 2) self.assertEqual(actual, expect) + + @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11") + def test_vortex_basic(self): + """Test basic data evolution read/write with Vortex format.""" + pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ('f2', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'vortex', + } + ) + self.catalog.create_table('default.test_vortex_basic', schema, False) + table = self.catalog.get_table('default.test_vortex_basic') + + write_builder = table.new_batch_write_builder() + + # Commit 1: write f0, f1 + w0 = write_builder.new_write().with_write_type(['f0', 'f1']) + w1 = write_builder.new_write().with_write_type(['f2']) + c = write_builder.new_commit() + d0 = pa.Table.from_pydict( + {'f0': [1, 2, 3], 'f1': ['a', 'b', 'c']}, + schema=pa.schema([('f0', pa.int32()), ('f1', pa.string())])) + d1 = pa.Table.from_pydict( + {'f2': ['x', 'y', 'z']}, + schema=pa.schema([('f2', pa.string())])) + w0.write_arrow(d0) + w1.write_arrow(d1) + cmts = w0.prepare_commit() + w1.prepare_commit() + for msg in cmts: + for nf in msg.new_files: + nf.first_row_id = 0 + c.commit(cmts) + w0.close() + w1.close() + c.close() + + read_builder = table.new_read_builder() + actual = read_builder.new_read().to_arrow(read_builder.new_scan().plan().splits()) + expected = pa.Table.from_pydict({ + 'f0': [1, 2, 3], + 'f1': ['a', 'b', 'c'], + 'f2': ['x', 'y', 'z'], + }, schema=pa_schema) + self.assertEqual(actual, expected) + + @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11") + def test_vortex_row_id_filter(self): + """Test that Vortex row_indices pushdown works via file_reader_supplier row_ranges.""" + pa_schema = pa.schema([ + ('f0', pa.int32()), + ('f1', pa.string()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'vortex', + } + ) + self.catalog.create_table('default.test_vortex_row_id_filter', schema, False) + table = self.catalog.get_table('default.test_vortex_row_id_filter') + + write_builder = table.new_batch_write_builder() + + # Commit 1: rows 0-4 + w = write_builder.new_write() + c = write_builder.new_commit() + w.write_arrow(pa.Table.from_pydict( + {'f0': list(range(5)), 'f1': [f'v{i}' for i in range(5)]}, + schema=pa_schema)) + c.commit(w.prepare_commit()) + w.close() + c.close() + + # Commit 2: rows 5-9 + w = write_builder.new_write() + c = write_builder.new_commit() + w.write_arrow(pa.Table.from_pydict( + {'f0': list(range(5, 10)), 'f1': [f'v{i}' for i in range(5, 10)]}, + schema=pa_schema)) + c.commit(w.prepare_commit()) + w.close() + c.close() + + # Full read + rb = table.new_read_builder() + full = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(full.num_rows, 10) + + # Filter by _ROW_ID using predicate — triggers row_ranges pushdown in Vortex + rb_with_rowid = table.new_read_builder().with_projection(['f0', 'f1', '_ROW_ID']) + pb = rb_with_rowid.new_predicate_builder() + rb_filtered = table.new_read_builder().with_filter(pb.equal('_ROW_ID', 3)) + filtered = rb_filtered.new_read().to_arrow(rb_filtered.new_scan().plan().splits()) + self.assertEqual(filtered.num_rows, 1) + self.assertEqual(filtered.column('f0')[0].as_py(), 3) + self.assertEqual(filtered.column('f1')[0].as_py(), 'v3') + + # Filter by _ROW_ID range spanning two files + rb_range = table.new_read_builder().with_filter(pb.between('_ROW_ID', 3, 6)) + range_result = rb_range.new_read().to_arrow(rb_range.new_scan().plan().splits()) + self.assertEqual(range_result.num_rows, 4) + self.assertEqual(sorted(range_result.column('f0').to_pylist()), [3, 4, 5, 6]) + + @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11") + def test_vortex_with_slice(self): + """Test with_slice on Vortex data evolution table.""" + pa_schema = pa.schema([ + ('id', pa.int64()), + ('val', pa.int32()), + ]) + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'file.format': 'vortex', + 'source.split.target-size': '512m', + } + ) + self.catalog.create_table('default.test_vortex_with_slice', schema, False) + table = self.catalog.get_table('default.test_vortex_with_slice') + + for batch in [ + {'id': [1, 2], 'val': [10, 20]}, + {'id': [3, 4], 'val': [30, 40]}, + {'id': [5, 6], 'val': [50, 60]}, + ]: + wb = table.new_batch_write_builder() + tw = wb.new_write() + tc = wb.new_commit() + tw.write_arrow(pa.Table.from_pydict(batch, schema=pa_schema)) + tc.commit(tw.prepare_commit()) + tw.close() + tc.close() + + rb = table.new_read_builder() + + # Full read + full = rb.new_read().to_arrow(rb.new_scan().plan().splits()) + self.assertEqual(full.num_rows, 6) + + # with_slice(1, 4) -> rows at index 1,2,3 -> id in (2,3,4) + scan = rb.new_scan().with_slice(1, 4) + sliced = rb.new_read().to_arrow(scan.plan().splits()) + self.assertEqual(sliced.num_rows, 3) + self.assertEqual(sorted(sliced.column('id').to_pylist()), [2, 3, 4]) diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py b/paimon-python/pypaimon/tests/reader_append_only_test.py index 54749a9dd8..a8529c14a8 100644 --- a/paimon-python/pypaimon/tests/reader_append_only_test.py +++ b/paimon-python/pypaimon/tests/reader_append_only_test.py @@ -125,14 +125,48 @@ class AoReaderTest(unittest.TestCase): p1 = predicate_builder.less_than('user_id', 7) p2 = predicate_builder.greater_or_equal('user_id', 2) p3 = predicate_builder.between('user_id', 0, 6) # [2/b, 3/c, 4/d, 5/e, 6/f] left - p4 = predicate_builder.is_not_in('behavior', ['b', 'e']) # [3/c, 4/d, 6/f] left - p5 = predicate_builder.is_in('dt', ['p1']) # exclude 3/c - p6 = predicate_builder.is_not_null('behavior') # exclude 4/d - g1 = predicate_builder.and_predicates([p1, p2, p3, p4, p5, p6]) + p6 = predicate_builder.is_not_null('behavior') # exclude 4/d -> [2/b, 3/c, 5/e, 6/f] + g1 = predicate_builder.and_predicates([p1, p2, p3, p6]) read_builder = table.new_read_builder().with_filter(g1) actual = self._read_test_table(read_builder) expected = pa.concat_tables([ - self.expected.slice(5, 1) # 6/f + self.expected.slice(1, 2), # 2/b, 3/c + self.expected.slice(4, 2), # 5/e, 6/f + ]) + self.assertEqual(actual.sort_by('user_id'), expected) + + # OR predicates with startswith, endswith, contains, equal, is_null + p7 = predicate_builder.startswith('behavior', 'a') + p10 = predicate_builder.equal('item_id', 1002) + p11 = predicate_builder.is_null('behavior') + p9 = predicate_builder.contains('behavior', 'f') + p8 = predicate_builder.endswith('dt', 'p2') + g2 = predicate_builder.or_predicates([p7, p8, p9, p10, p11]) + read_builder = table.new_read_builder().with_filter(g2) + actual = self._read_test_table(read_builder) + self.assertEqual(actual.sort_by('user_id'), self.expected) + + # Combined AND + OR + g3 = predicate_builder.and_predicates([g1, g2]) + read_builder = table.new_read_builder().with_filter(g3) + actual = self._read_test_table(read_builder) + expected = pa.concat_tables([ + self.expected.slice(1, 2), # 2/b, 3/c + self.expected.slice(4, 2), # 5/e, 6/f + ]) + self.assertEqual(actual.sort_by('user_id'), expected) + + # not_equal also filters None values + p12 = predicate_builder.not_equal('behavior', 'f') + read_builder = table.new_read_builder().with_filter(p12) + actual = self._read_test_table(read_builder) + expected = pa.concat_tables([ + self.expected.slice(0, 1), # 1/a + self.expected.slice(1, 1), # 2/b + self.expected.slice(2, 1), # 3/c + self.expected.slice(4, 1), # 5/e + self.expected.slice(6, 1), # 7/g + self.expected.slice(7, 1), # 8/h ]) self.assertEqual(actual.sort_by('user_id'), expected)
