This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2d3cadba16 [python] Add three-layer predicate pushdown correctness
regression tests (#7805)
2d3cadba16 is described below
commit 2d3cadba16d5d3153d737c3b1072bdf046a41c03
Author: chaoyang <[email protected]>
AuthorDate: Tue May 12 17:49:39 2026 +0800
[python] Add three-layer predicate pushdown correctness regression tests
(#7805)
Predicate pushdown in pypaimon spans three correctness-sensitive layers
(filter / file index / value stats) plus a couple of subtle invariants
around PK tables and post-merge value filtering. The correctness
obligations have so far been enforced by spot tests in a handful of
unrelated files; a regression that re-introduces a
``value_stats``-vs-``key_stats`` confusion or a per-file value filter
on PK rows would be hard to detect without targeted coverage.
## Effect
Adds a single test file that locks in the invariants as a directed
regression suite:
- **PK manifest pruning consults ``key_stats`` only** — covered by both
the value-only-predicate early-return path and the compound
(``id`` + ``val``) path, in each case using ``value_stats`` that
would drop the file if consulted.
- **Reader-level value predicate is post-merge** — writing a newer
value that fails ``< 50`` must not resurrect the older matching
value.
- **Append-only ``value_stats`` pruning** is applied and correct.
Three layers of coverage:
1. **Unit** — synthetic ``ManifestEntry`` through
``FileScanner._filter_manifest_entry`` to assert the manifest
gate's exact behaviour without relying on full I/O.
2. **Round-trip** — real catalog writes/reads with a post-merge oracle.
3. **Property** — seeded random datasets + 12 random predicate ops
(incl. ``is_null``, re-covering a historical bug where missing
``null_counts`` caused ``isNull`` to drop every file). No
``hypothesis`` dependency, keeps the Python 3.6 compatibility
contract intact.
No production code changes — pure regression coverage. 11 new test
cases; full suite passes locally.
---
.../pypaimon/tests/pushdown_correctness_test.py | 681 +++++++++++++++++++++
1 file changed, 681 insertions(+)
diff --git a/paimon-python/pypaimon/tests/pushdown_correctness_test.py
b/paimon-python/pypaimon/tests/pushdown_correctness_test.py
new file mode 100644
index 0000000000..92f8b08a03
--- /dev/null
+++ b/paimon-python/pypaimon/tests/pushdown_correctness_test.py
@@ -0,0 +1,681 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+Predicate pushdown correctness regression suite.
+
+Locks in the invariants:
+
+ - PK tables: manifest stats pruning consults *key_stats only*. A
+ value-only predicate must short-circuit to "keep the file" even if
+ value_stats would say otherwise.
+ - Reader-level value predicate is applied *after* merge — covered rows
+ cannot resurface even when an older file passes manifest pruning.
+ - Append-only tables: value_stats pruning is false-positive-safe (may
+ over-include) but never false-negative (never drops a live row).
+
+Three layers:
+
+ 1. Unit — synthetic ManifestEntry to assert the manifest gate's exact
+ behaviour without relying on full I/O.
+ 2. Round-trip — write real snapshots, read back through the public API,
+ compare to the source-of-truth (post-merge oracle).
+ 3. Property — random datasets + random predicates, asserting that the
+ pushed-down result equals the full-scan-then-filter oracle.
+ (Deterministic random; no hypothesis dependency, keeps the
+ Python 3.6 compatibility contract intact.)
+"""
+
+import os
+import random
+import shutil
+import tempfile
+import unittest
+from typing import Any, Dict, List, Optional, Tuple
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.common.predicate import Predicate
+from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.manifest.schema.simple_stats import SimpleStats
+from pypaimon.read.scanner.file_scanner import FileScanner
+from pypaimon.schema.data_types import AtomicType, DataField
+from pypaimon.table.row.generic_row import GenericRow
+
+
+# ---------------------------------------------------------------------------
+# Synthetic-fixture helpers used by the unit-level tests.
+# ---------------------------------------------------------------------------
+def _stats(min_vals: List[Any], max_vals: List[Any],
+ fields: List[DataField], null_counts: Optional[List[int]] = None)
-> SimpleStats:
+ if null_counts is None:
+ null_counts = [0] * len(min_vals)
+ return SimpleStats(
+ min_values=GenericRow(min_vals, fields),
+ max_values=GenericRow(max_vals, fields),
+ null_counts=null_counts,
+ )
+
+
+def _make_pk_entry(level: int, key_stats: SimpleStats, value_stats:
SimpleStats,
+ row_count: int = 1, schema_id: int = 0,
+ value_stats_cols: Optional[List[str]] = None) ->
ManifestEntry:
+ file = DataFileMeta.create(
+ file_name=f'data-L{level}-{random.randint(0, 1 << 30)}.parquet',
+ file_size=1024,
+ row_count=row_count,
+ min_key=key_stats.min_values,
+ max_key=key_stats.max_values,
+ key_stats=key_stats,
+ value_stats=value_stats,
+ min_sequence_number=0,
+ max_sequence_number=0,
+ schema_id=schema_id,
+ level=level,
+ extra_files=[],
+ value_stats_cols=value_stats_cols,
+ )
+ return ManifestEntry(
+ kind=0, partition=GenericRow([], []), bucket=0,
+ total_buckets=1, file=file,
+ )
+
+
+def _build_pk_scanner(table, predicate: Optional[Predicate]) -> FileScanner:
+ """Construct a FileScanner without running any actual scan."""
+ return FileScanner(
+ table=table,
+ manifest_scanner=lambda: [],
+ predicate=predicate,
+ )
+
+
+# ---------------------------------------------------------------------------
+# Layer 1 — Unit: assert _filter_manifest_entry behaviour on synthetic input.
+# ---------------------------------------------------------------------------
+class FilterManifestEntryUnitTest(unittest.TestCase):
+ """Pin down the contract of FileScanner._filter_manifest_entry."""
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', False)
+
+ cls._pa_schema = pa.schema([
+ pa.field('id', pa.int64(), nullable=False),
+ ('val', pa.int64()),
+ ])
+ # PK table — exercises the stats-only path.
+ cls.catalog.create_table(
+ 'default.pk_plain',
+ Schema.from_pyarrow_schema(
+ cls._pa_schema,
+ primary_keys=['id'],
+ options={'bucket': '1', 'file.format': 'parquet'},
+ ),
+ False,
+ )
+ # Append-only table — exercises the value_stats path for comparison.
+ cls.catalog.create_table(
+ 'default.append_only',
+ Schema.from_pyarrow_schema(
+ cls._pa_schema,
+ options={'file.format': 'parquet', 'metadata.stats-mode':
'full'},
+ ),
+ False,
+ )
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ @staticmethod
+ def _id_field():
+ return DataField(0, 'id', AtomicType('BIGINT', nullable=False))
+
+ @staticmethod
+ def _val_field():
+ return DataField(1, 'val', AtomicType('BIGINT', nullable=True))
+
+ # ---- The headline invariant ----------------------------------------
+ def test_pk_table_uses_key_stats_only_not_value_stats(self):
+ """The whole point: PK manifest pruning must NEVER consult value_stats.
+
+ We construct an entry where:
+ * key_stats says id ∈ [10, 20] ← truthful PK range, includes 15
+ * value_stats says val ∈ [99, 99] ← intentionally a lie that, if
+ consulted, would let a `val == 0` predicate falsely drop the file.
+
+ The query is on the value column (`val == 0`). On a PK table the
+ Python design only projects the predicate onto PK columns first
+ (which yields an empty/no-op predicate), so the file MUST be kept.
+ If the implementation regressed and consulted value_stats, the file
+ would be dropped and the (post-merge) row that actually has val=0
+ in some other file would still come back — silently incorrect.
+ """
+ table = self.catalog.get_table('default.pk_plain')
+ pred = table.new_read_builder().new_predicate_builder().equal('val', 0)
+
+ entry = _make_pk_entry(
+ level=1,
+ key_stats=_stats([10], [20], [self._id_field()]),
+ # Lie about val: stats claim 99..99, predicate is val==0.
+ value_stats=_stats([99], [99], [self._val_field()]),
+ )
+ scanner = _build_pk_scanner(table, pred)
+ # primary_key_predicate is None for a value-only predicate on PK
+ # table → the gate must short-circuit to True regardless of stats.
+ self.assertIsNone(scanner.primary_key_predicate,
+ "value predicate must not project onto PK")
+ self.assertTrue(scanner._filter_manifest_entry(entry),
+ "PK table must NOT consult value_stats — entry kept")
+
+ def test_pk_table_uses_key_stats_to_drop_outside_range(self):
+ """Symmetric: when the predicate IS on PK and key_stats excludes it,
drop."""
+ table = self.catalog.get_table('default.pk_plain')
+ pred = table.new_read_builder().new_predicate_builder().equal('id', 5)
+
+ entry = _make_pk_entry(
+ level=1,
+ key_stats=_stats([10], [20], [self._id_field()]),
+ value_stats=_stats([0], [0], [self._val_field()]),
+ )
+ scanner = _build_pk_scanner(table, pred)
+ self.assertFalse(scanner._filter_manifest_entry(entry),
+ "PK predicate outside key_stats range must drop")
+
+ def test_pk_table_uses_key_stats_to_keep_inside_range(self):
+ table = self.catalog.get_table('default.pk_plain')
+ pred = table.new_read_builder().new_predicate_builder().equal('id', 15)
+
+ entry = _make_pk_entry(
+ level=1,
+ key_stats=_stats([10], [20], [self._id_field()]),
+ value_stats=_stats([0], [0], [self._val_field()]),
+ )
+ scanner = _build_pk_scanner(table, pred)
+ self.assertTrue(scanner._filter_manifest_entry(entry))
+
+ def test_pk_compound_predicate_only_consults_key_stats(self):
+ """Dual to test_pk_table_uses_key_stats_only_*: covers the
+ non-early-return branch.
+
+ Predicate is `id == 15 AND val == 0`. The PK projection yields
+ `id == 15` which key_stats [10,20] keeps. value_stats deliberately
+ lies that val ∈ [99,99], so a (buggy) implementation that consulted
+ value_stats anywhere — even AFTER the early return — would drop the
+ file (Equal.test_by_stats(99,99,[0]) is False). The correct
+ implementation reaches `primary_key_predicate.test_by_simple_stats`
+ which only ever touches key_stats, so the assertion is True.
+ """
+ table = self.catalog.get_table('default.pk_plain')
+ pb = table.new_read_builder().new_predicate_builder()
+ pred = pb.and_predicates([
+ pb.equal('id', 15),
+ pb.equal('val', 0),
+ ])
+
+ entry = _make_pk_entry(
+ level=1,
+ key_stats=_stats([10], [20], [self._id_field()]),
+ # value_stats lies: claims val ∈ [99,99]. If consulted, val==0
+ # is outside the range and the file would be dropped.
+ value_stats=_stats([99], [99], [self._val_field()]),
+ )
+ scanner = _build_pk_scanner(table, pred)
+ # PK predicate is now non-empty (id == 15) — exercises the path
+ # that calls primary_key_predicate.test_by_simple_stats(key_stats).
+ self.assertIsNotNone(scanner.primary_key_predicate)
+ self.assertTrue(scanner._filter_manifest_entry(entry),
+ "value_stats must NEVER be consulted on PK tables — "
+ "even when the PK predicate path runs")
+
+ # ---- Append-only path uses value_stats (and that IS correct) -------
+ def _append_entry(self, val_min: int, val_max: int) -> ManifestEntry:
+ """Append-only entries: value_stats covers BOTH id and val (the
+ projection in SimpleStatsEvolution maps schema indices through
+ value_stats_cols)."""
+ fields = [self._id_field(), self._val_field()]
+ return _make_pk_entry(
+ level=0,
+ key_stats=_stats([0], [0], [self._id_field()]),
+ value_stats=_stats([0, val_min], [0, val_max], fields),
+ value_stats_cols=['id', 'val'],
+ )
+
+ def test_append_only_uses_value_stats_to_drop(self):
+ """Append-only has no L0/merge complications — value_stats pruning
+ is safe and required for performance."""
+ table = self.catalog.get_table('default.append_only')
+ pred = table.new_read_builder().new_predicate_builder().equal('val', 5)
+
+ # value_stats excludes 5 → must drop.
+ scanner = _build_pk_scanner(table, pred)
+ self.assertFalse(scanner._filter_manifest_entry(self._append_entry(10,
20)))
+
+ def test_append_only_uses_value_stats_to_keep(self):
+ table = self.catalog.get_table('default.append_only')
+ pred = table.new_read_builder().new_predicate_builder().equal('val',
15)
+
+ scanner = _build_pk_scanner(table, pred)
+ self.assertTrue(scanner._filter_manifest_entry(self._append_entry(10,
20)))
+
+
+# ---------------------------------------------------------------------------
+# Layer 2 — Round-trip integration: write real snapshots, read via public API.
+# ---------------------------------------------------------------------------
+class PushdownRoundTripIntegrationTest(unittest.TestCase):
+ """Write multi-snapshot tables and verify pushdown vs full-scan oracle."""
+
+ # Suppress compaction so L0 deterministically persists across snapshots.
+ _SUPPRESS = {
+ 'bucket': '1',
+ 'num-levels': '3',
+ 'num-sorted-run.compaction-trigger': '999',
+ 'num-sorted-run.stop-trigger': '999',
+ 'compaction.max-size-amplification-percent': '999',
+ }
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', False)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def _create_pk_table(self, name: str) -> Any:
+ opts = dict(self._SUPPRESS)
+ pa_schema = pa.schema([
+ pa.field('id', pa.int64(), nullable=False),
+ ('val', pa.int64()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema, primary_keys=['id'], options=opts,
+ )
+ full = f'default.{name}'
+ self.catalog.create_table(full, schema, False)
+ return self.catalog.get_table(full)
+
+ def _create_append_table(self, name: str) -> Any:
+ pa_schema = pa.schema([
+ pa.field('id', pa.int64(), nullable=False),
+ ('val', pa.int64()),
+ ('grp', pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema, options={'file.format': 'parquet',
'metadata.stats-mode': 'full'},
+ )
+ full = f'default.{name}'
+ self.catalog.create_table(full, schema, False)
+ return self.catalog.get_table(full)
+
+ def _write(self, table, batches: List[List[Dict]]):
+ pa_schema = table.table_schema.to_arrow_schema() if hasattr(
+ table.table_schema, 'to_arrow_schema') else None
+ for rows in batches:
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ c = wb.new_commit()
+ try:
+ if pa_schema is not None:
+ batch = pa.Table.from_pylist(rows, schema=pa_schema)
+ else:
+ batch = pa.Table.from_pylist(rows)
+ w.write_arrow(batch)
+ c.commit(w.prepare_commit())
+ finally:
+ w.close()
+ c.close()
+
+ def _read_all(self, table, predicate=None) -> List[Dict]:
+ rb = table.new_read_builder()
+ if predicate is not None:
+ rb = rb.with_filter(predicate)
+ scan = rb.new_scan()
+ splits = scan.plan().splits()
+ if not splits:
+ return []
+ return rb.new_read().to_arrow(splits).to_pylist()
+
+ # -------------------------------------------------------------------
+ # Partition stats false-positive safety: predicate on a value column
+ # never accidentally drops a manifest. (A filter ON the partition column
+ # is exact; this asserts the orthogonal predicate path.)
+ # -------------------------------------------------------------------
+ def test_append_value_predicate_matches_oracle(self):
+ table = self._create_append_table('rt_append_val')
+ rows = [{'id': i, 'val': i * 10, 'grp': 'a' if i % 2 else 'b'}
+ for i in range(100)]
+ # Two snapshots → two manifests → exercises manifest-level partition
+ # stats pruning even on append tables.
+ self._write(table, [rows[:50], rows[50:]])
+
+ pred_builder = table.new_read_builder().new_predicate_builder()
+ for predicate, oracle_pred in [
+ (pred_builder.equal('val', 250), lambda r: r['val'] == 250),
+ (pred_builder.greater_than('val', 800), lambda r: r['val'] > 800),
+ (pred_builder.between('val', 100, 300),
+ lambda r: 100 <= r['val'] <= 300),
+ (pred_builder.is_in('grp', ['a']), lambda r: r['grp'] == 'a'),
+ ]:
+ with self.subTest(predicate=predicate.method):
+ got = sorted(self._read_all(table, predicate=predicate),
+ key=lambda r: r['id'])
+ want = sorted([r for r in rows if oracle_pred(r)],
+ key=lambda r: r['id'])
+ self.assertEqual(got, want,
+ "pushed-down predicate result must equal
full-scan oracle")
+
+ def test_pk_pk_predicate_matches_oracle(self):
+ """PK predicate on PK table: exact, exercises key_stats path."""
+ table = self._create_pk_table('rt_pk_pk_pred')
+ rows = [{'id': i, 'val': i * 7} for i in range(50)]
+ self._write(table, [rows[:25], rows[25:]])
+
+ pb = table.new_read_builder().new_predicate_builder()
+ for predicate, oracle in [
+ (pb.equal('id', 13), lambda r: r['id'] == 13),
+ (pb.is_in('id', [1, 5, 10, 99]),
+ lambda r: r['id'] in {1, 5, 10, 99}),
+ (pb.greater_or_equal('id', 40), lambda r: r['id'] >= 40),
+ ]:
+ with self.subTest(predicate=predicate.method):
+ got = sorted(self._read_all(table, predicate=predicate),
+ key=lambda r: r['id'])
+ want = sorted([r for r in rows if oracle(r)],
+ key=lambda r: r['id'])
+ self.assertEqual(got, want)
+
+ def test_pk_value_predicate_matches_post_merge_oracle(self):
+ """A value predicate on a PK table: post-merge oracle == reader output.
+
+ This is the 'value filter applied after merge' contract translated
+ into a directly-checkable property: the answer is whatever you'd
+ get by (a) merging snapshots into latest-per-PK, (b) applying the
+ predicate to that merged set in Python.
+
+ Note: this asserts end-to-end semantics only (predicate is applied
+ post-merge, not per-file). The "PK manifest gate must not consult
+ value_stats" invariant is locked down in the unit tests above
+ (``test_pk_table_uses_key_stats_only_*``); this round-trip case
+ does not by itself catch a regression where the gate misuses
+ value_stats, since the file-level effect is masked by the
+ post-merge filter.
+ """
+ table = self._create_pk_table('rt_pk_val_pred')
+ self._write(table, [
+ [{'id': i, 'val': i} for i in range(20)],
+ [{'id': i, 'val': i + 1000} for i in range(0, 20, 2)], # update
evens
+ ])
+ # Oracle: latest write per PK wins.
+ merged: Dict[int, int] = {}
+ for batch in [
+ [(i, i) for i in range(20)],
+ [(i, i + 1000) for i in range(0, 20, 2)],
+ ]:
+ for k, v in batch:
+ merged[k] = v
+
+ pb = table.new_read_builder().new_predicate_builder()
+ for predicate, oracle in [
+ (pb.greater_than('val', 500), lambda v: v > 500),
+ (pb.less_or_equal('val', 5), lambda v: v <= 5),
+ (pb.between('val', 1000, 1010), lambda v: 1000 <= v <= 1010),
+ ]:
+ with self.subTest(predicate=predicate.method):
+ got = sorted(self._read_all(table, predicate=predicate),
+ key=lambda r: r['id'])
+ want = sorted(
+ [{'id': k, 'val': v} for k, v in merged.items() if
oracle(v)],
+ key=lambda r: r['id'])
+ self.assertEqual(got, want,
+ "pushed-down value filter must agree with
post-merge oracle")
+
+
+# ---------------------------------------------------------------------------
+# Layer 3 — Property: random datasets + random predicates.
+# ---------------------------------------------------------------------------
+class PushdownPropertyTest(unittest.TestCase):
+ """For N random (table, predicate) pairs, push-down result must equal
+ the full-scan-then-Python-filter result.
+
+ No hypothesis dependency — keeps Python 3.6 compat. Seeded for repro.
+ """
+
+ SEED = 0x70D0 # stable across runs; bump to flush flaky regressions.
+ APPEND_TRIALS = 40
+ PK_TRIALS = 30
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', False)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ def setUp(self):
+ # Fresh per-test RNG keeps each test method's dataset/predicate
+ # sequence stable regardless of method execution order or future
+ # additions in this class.
+ self.rnd = random.Random(self.SEED)
+
+ def _make_append_table(self, idx: int):
+ pa_schema = pa.schema([
+ pa.field('k', pa.int64(), nullable=False),
+ ('v', pa.int64()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={'file.format': 'parquet', 'metadata.stats-mode': 'full'},
+ )
+ name = f'default.prop_append_{idx}'
+ self.catalog.create_table(name, schema, False)
+ return self.catalog.get_table(name)
+
+ def _make_pk_table(self, idx: int):
+ pa_schema = pa.schema([
+ pa.field('k', pa.int64(), nullable=False),
+ ('v', pa.int64()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ primary_keys=['k'],
+ options={'bucket': '1', 'file.format': 'parquet'},
+ )
+ name = f'default.prop_pk_{idx}'
+ self.catalog.create_table(name, schema, False)
+ return self.catalog.get_table(name)
+
+ def _write_one_snapshot(self, table, rows: List[Dict]):
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ c = wb.new_commit()
+ try:
+ pa_schema = table.table_schema.to_arrow_schema() if hasattr(
+ table.table_schema, 'to_arrow_schema') else None
+ batch = pa.Table.from_pylist(rows, schema=pa_schema) \
+ if pa_schema is not None else pa.Table.from_pylist(rows)
+ w.write_arrow(batch)
+ c.commit(w.prepare_commit())
+ finally:
+ w.close()
+ c.close()
+
+ def _read_with(self, table, predicate=None):
+ rb = table.new_read_builder()
+ if predicate is not None:
+ rb = rb.with_filter(predicate)
+ splits = rb.new_scan().plan().splits()
+ if not splits:
+ return []
+ return rb.new_read().to_arrow(splits).to_pylist()
+
+ def _gen_predicate(self, pb, field: str,
+ sample_values: List[int]) -> Tuple[Predicate, Any]:
+ """Pick a random predicate on `field`. Returns (predicate, oracle).
+
+ Operator set covers every method the property layer can sensibly
+ exercise on numeric columns: equal/not_equal, full ordering family,
+ between/not_between, in/not_in, is_null/is_not_null. The
+ is_null path in particular re-covers a historical bug where
+ missing null_counts in stats caused isNull to drop every file.
+ """
+ op = self.rnd.choice([
+ 'equal', 'not_equal',
+ 'less_than', 'less_or_equal', 'greater_than', 'greater_or_equal',
+ 'between', 'not_between',
+ 'is_in', 'is_not_in',
+ 'is_null', 'is_not_null',
+ ])
+ if op == 'equal':
+ v = self.rnd.choice(sample_values)
+ return pb.equal(field, v), lambda r: r[field] == v
+ if op == 'not_equal':
+ v = self.rnd.choice(sample_values)
+ return pb.not_equal(field, v), lambda r: r[field] != v
+ if op == 'less_than':
+ v = self.rnd.choice(sample_values)
+ return pb.less_than(field, v), lambda r: r[field] < v
+ if op == 'less_or_equal':
+ v = self.rnd.choice(sample_values)
+ return pb.less_or_equal(field, v), lambda r: r[field] <= v
+ if op == 'greater_than':
+ v = self.rnd.choice(sample_values)
+ return pb.greater_than(field, v), lambda r: r[field] > v
+ if op == 'greater_or_equal':
+ v = self.rnd.choice(sample_values)
+ return pb.greater_or_equal(field, v), lambda r: r[field] >= v
+ if op == 'between':
+ a, b = sorted(self.rnd.sample(sample_values, 2))
+ return (pb.between(field, a, b),
+ lambda r, a=a, b=b: a <= r[field] <= b)
+ if op == 'not_between':
+ a, b = sorted(self.rnd.sample(sample_values, 2))
+ return (pb.not_between(field, a, b),
+ lambda r, a=a, b=b: not (a <= r[field] <= b))
+ if op == 'is_in':
+ n = self.rnd.randint(1, min(5, len(sample_values)))
+ xs = self.rnd.sample(sample_values, n)
+ return pb.is_in(field, xs), (lambda r, xs=set(xs): r[field] in xs)
+ if op == 'is_not_in':
+ n = self.rnd.randint(1, min(5, len(sample_values)))
+ xs = self.rnd.sample(sample_values, n)
+ return (pb.is_not_in(field, xs),
+ lambda r, xs=set(xs): r[field] not in xs)
+ if op == 'is_null':
+ # Our test data has no nulls, so the oracle is "no row matches"
+ # — but the pushdown must arrive at the same answer without
+ # incorrectly dropping non-null files (the historical bug).
+ return pb.is_null(field), lambda r: False
+ # is_not_null
+ return pb.is_not_null(field), lambda r: True
+
+ # -------------------------------------------------------------------
+ # Append-only: oracle = identity over rows; push-down filter result
+ # must equal Python-side filter over all rows.
+ # -------------------------------------------------------------------
+ def test_property_append_random(self):
+ for trial in range(self.APPEND_TRIALS):
+ table = self._make_append_table(trial)
+ n = self.rnd.randint(20, 200)
+ rows = [{'k': i, 'v': self.rnd.randint(-50, 50)} for i in range(n)]
+ # 1-3 snapshots so we exercise multiple manifests.
+ n_snaps = self.rnd.randint(1, 3)
+ chunks = self._chunk(rows, n_snaps)
+ for chunk in chunks:
+ if chunk:
+ self._write_one_snapshot(table, chunk)
+
+ pb = table.new_read_builder().new_predicate_builder()
+ field = self.rnd.choice(['k', 'v'])
+ sample = sorted({r[field] for r in rows})
+ pred, oracle = self._gen_predicate(pb, field, sample)
+
+ got = sorted(self._read_with(table, pred), key=lambda r: r['k'])
+ want = sorted([r for r in rows if oracle(r)], key=lambda r: r['k'])
+ self.assertEqual(got, want,
+ f"trial {trial} field={field}
method={pred.method} mismatch")
+
+ # -------------------------------------------------------------------
+ # PK: oracle = post-merge state (latest write per PK wins). Random
+ # multi-snapshot writes can update the same PK.
+ # -------------------------------------------------------------------
+ def test_property_pk_random(self):
+ for trial in range(self.PK_TRIALS):
+ table = self._make_pk_table(trial)
+ n_snaps = self.rnd.randint(1, 3)
+ merged: Dict[int, int] = {}
+ for _ in range(n_snaps):
+ m = self.rnd.randint(5, 20)
+ # Within a single snapshot batch, Paimon's merge resolution
+ # for duplicate PKs is not the "last-in-iteration" rule we
+ # use as oracle. Avoid the ambiguity by guaranteeing PK
+ # uniqueness inside each batch — the cross-snapshot update
+ # semantics (the path we actually want to exercise) are
+ # unaffected.
+ ks = self.rnd.sample(range(40), min(m, 40))
+ rows = []
+ for k in ks:
+ v = self.rnd.randint(-50, 50)
+ rows.append({'k': k, 'v': v})
+ merged[k] = v # latest snapshot wins
+ self._write_one_snapshot(table, rows)
+
+ pb = table.new_read_builder().new_predicate_builder()
+ # PK predicates hit key_stats path; value predicates hit
+ # post-merge filter path. Mix both.
+ field = self.rnd.choice(['k', 'v'])
+ if not merged:
+ continue
+ sample_values = sorted(merged.keys() if field == 'k'
+ else set(merged.values()))
+ if not sample_values:
+ continue
+ pred, oracle = self._gen_predicate(pb, field, sample_values)
+
+ got = sorted(self._read_with(table, pred), key=lambda r: r['k'])
+ want = sorted(
+ [{'k': k, 'v': v} for k, v in merged.items()
+ if oracle({'k': k, 'v': v})],
+ key=lambda r: r['k'])
+ self.assertEqual(got, want,
+ f"trial {trial} field={field}
method={pred.method} mismatch "
+ f"merged={merged}")
+
+ @staticmethod
+ def _chunk(items: List, n: int) -> List[List]:
+ """Split into n roughly-equal chunks."""
+ n = max(1, n)
+ size = max(1, (len(items) + n - 1) // n)
+ return [items[i:i + size] for i in range(0, len(items), size)]
+
+
+if __name__ == '__main__':
+ unittest.main()