This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 69d9bf3b26 [python] Add ReadBuilder.explain() for scan-plan visibility
(#7869)
69d9bf3b26 is described below
commit 69d9bf3b264ab713136ddd23acacf48a1a256b43
Author: chaoyang <[email protected]>
AuthorDate: Tue May 19 15:28:11 2026 +0800
[python] Add ReadBuilder.explain() for scan-plan visibility (#7869)
Add `ReadBuilder.explain()` returning a structured `ExplainResult` so
users
can see what a PyPaimon read will actually do — target snapshot,
pushed-down
predicate / projection / limit, partition / bucket / file-stats pruning
funnel, and split-level execution signals (raw-convertible ratio,
deletion-
vector ratio, level histogram, split-size skew).
The default `__str__` is a compact debug layout; `verbose=True` lists
every
split. Reads manifest list + manifests only — data files are never
opened.
---
docs/content/pypaimon/python-api.md | 66 +++++
paimon-python/pypaimon/read/explain.py | 241 ++++++++++++++++++
paimon-python/pypaimon/read/explain_render.py | 100 ++++++++
paimon-python/pypaimon/read/read_builder.py | 215 ++++++++++++++++
paimon-python/pypaimon/read/scan_stats.py | 51 ++++
.../pypaimon/read/scanner/file_scanner.py | 83 ++++++-
paimon-python/pypaimon/read/table_scan.py | 11 +-
.../pypaimon/tests/read_builder_explain_test.py | 270 +++++++++++++++++++++
8 files changed, 1027 insertions(+), 10 deletions(-)
diff --git a/docs/content/pypaimon/python-api.md
b/docs/content/pypaimon/python-api.md
index 50a0117891..e83e1fa506 100644
--- a/docs/content/pypaimon/python-api.md
+++ b/docs/content/pypaimon/python-api.md
@@ -594,6 +594,72 @@ Key points about shard read:
- **Parallel Processing**: Each shard can be processed independently for
better performance
- **Consistency**: Combining all shards should produce the complete table data
+### Explain Scan Plan
+
+`ReadBuilder.explain()` returns a structured view of the scan plan without
reading any data files. It is useful for understanding which splits a query
will produce, how aggressively the pushdown pruned the input, and whether the
resulting splits can be read on the zero-copy fast path.
+
+```python
+table = catalog.get_table('default.events')
+read_builder = table.new_read_builder()
+predicate_builder = read_builder.new_predicate_builder()
+read_builder = read_builder.with_filter(predicate_builder.equal('dt',
'2026-05-16'))
+print(read_builder.explain())
+
+# == PyPaimon Scan Plan ==
+# Table: default.events (PK, HASH_FIXED)
+# Snapshot: 1 (schema 0)
+# Predicate: dt = '2026-05-16'
+# Projection: <all columns>
+# Limit: <none>
+#
+# Partition pruning: 12 -> 4 (pruned 8)
+# Bucket pruning: 4 -> 4 (pruned 0)
+# File skipping: 4 -> 4 (pruned 0)
+#
+# Splits: 4
+# raw-convertible: 4 / 4
+# with DV: 0 / 4
+# all-above-L0: 0 / 4
+# files/split: min=1 max=1 avg=1.00
+# size/split: min=2.8 KiB p50=2.9 KiB p95=3.0 KiB max=3.0 KiB
+#
+# Files: 4
+# Total size: 11.6 KiB
+# Estimated rows: 20 (merged: 20)
+# Level histogram: L0=4
+# Deletion files: 0
+```
+
+Pass `verbose=True` to also list every split with its partition, bucket, file
count, size, level histogram, and file paths:
+
+```python
+print(read_builder.explain(verbose=True))
+
+# ...
+#
+# Splits[]
+# [0] partition={'dt': '2026-05-16'} bucket=3 files=1 size=2.9 KiB rows=4
raw=True dv=False
+# levels: L0=1
+# file:
/warehouse/default.db/events/dt=2026-05-16/bucket-3/data-...parquet
+# [1] partition={'dt': '2026-05-16'} bucket=2 files=1 size=2.8 KiB rows=2
raw=True dv=False
+# levels: L0=1
+# file:
/warehouse/default.db/events/dt=2026-05-16/bucket-2/data-...parquet
+# ...
+```
+
+What the fields tell you:
+
+- **Pushdown** (`Predicate` / `Projection` / `Limit`): exactly what the reader
sees after `with_filter` / `with_projection` / `with_limit`.
+- **Pruning funnel** (`Partition pruning` / `Bucket pruning` / `File
skipping`): three `before -> after` counts that show at which stage the
predicate paid off. `n/a` means the stage did not apply — for example, bucket
pruning is reported for HASH_FIXED tables where every bucket key is pinned by
the predicate, and for POSTPONE_BUCKET tables that skip their synthetic-bucket
entries.
+- **Split shape**: `raw-convertible` counts splits that can be read zero-copy
(no merge, no deletion-vector apply); `with DV` counts splits whose files need
a deletion vector applied; `all-above-L0` counts splits whose data lives
entirely on L1+, i.e. the merge pipeline can skip the L0 buffer.
+- **File aggregates**: total file size + estimated rows (with the post-merge
row estimate for primary-key tables in parentheses), plus a level histogram of
where the data sits.
+
+{{< hint info >}}
+**Cost**: `explain()` reads the manifest list and manifest files but does not
open any data files. It suppresses the manifest-reader's early bucket filter
and forces single-threaded manifest decoding so the before/after counters are
accurate. On tables where the early filter usually prunes aggressively (e.g.
very wide HASH_FIXED tables with a tight predicate), this can make `explain()`
measurably slower than a regular `new_scan().plan()`.
+{{< /hint >}}
+
+`ExplainResult` is a plain dataclass — alongside the human-readable `__str__`
shown above, every field (`partition_pruning`, `bucket_pruning`,
`file_skipping`, `split_count`, `splits_raw_convertible`, `level_histogram`,
`splits`, ...) is addressable in Python for programmatic use.
+
## Rollback
Paimon supports rolling back a table to a previous snapshot or tag. This is
useful for undoing unwanted changes or
diff --git a/paimon-python/pypaimon/read/explain.py
b/paimon-python/pypaimon/read/explain.py
new file mode 100644
index 0000000000..13db6d4157
--- /dev/null
+++ b/paimon-python/pypaimon/read/explain.py
@@ -0,0 +1,241 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+from dataclasses import dataclass, field
+from typing import Any, Dict, List, Optional
+
+
+@dataclass
+class PruningStat:
+ """Before / after counters for one pruning stage.
+
+ ``before`` is the input size to the stage, ``after`` is the size that
+ survived. Either may be ``None`` when the stage did not run (for
+ example, ``bucket_pruning`` is ``None`` for tables that are not
+ HASH_FIXED with all bucket keys pinned).
+ """
+
+ before: Optional[int]
+ after: Optional[int]
+
+ @property
+ def pruned(self) -> Optional[int]:
+ if self.before is None or self.after is None:
+ return None
+ return self.before - self.after
+
+ def format(self) -> str:
+ if self.before is None and self.after is None:
+ return "n/a"
+ before = "?" if self.before is None else str(self.before)
+ after = "?" if self.after is None else str(self.after)
+ pruned = self.pruned
+ suffix = "" if pruned is None else " (pruned {})".format(pruned)
+ return "{} -> {}{}".format(before, after, suffix)
+
+
+@dataclass
+class ExplainSplitInfo:
+ """Per-split detail surfaced when ``explain(verbose=True)`` is used."""
+
+ partition: Dict[str, Any]
+ bucket: int
+ file_count: int
+ row_count: int
+ merged_row_count: Optional[int]
+ file_size: int
+ raw_convertible: bool
+ has_deletion_vectors: bool
+ level_histogram: Dict[int, int]
+ deletion_file_count: int
+ file_paths: List[str]
+
+
+@dataclass
+class ExplainResult:
+ """Structured scan plan returned by ``ReadBuilder.explain()``.
+
+ The compact ``__str__`` shows enough signal to reason about cost: the
+ snapshot, the pushed-down predicate / projection / limit, the three
+ pruning before-after counters, split-level shape (raw-convertible
+ ratio, DV ratio, all-above-L0 ratio, files/split, size/split
+ distribution), and the file-level totals. ``verbose=True`` adds a
+ block listing each split.
+ """
+
+ # Identity
+ table_identifier: str
+ is_primary_key_table: bool
+ bucket_mode: str
+ deletion_vectors_enabled: bool
+ data_evolution_enabled: bool
+
+ # Snapshot
+ snapshot_id: Optional[int]
+ schema_id: Optional[int]
+
+ # Pushdown
+ predicate: Optional[str] = None
+ projection: Optional[List[str]] = None
+ limit: Optional[int] = None
+
+ # Pruning (None when not applicable)
+ partition_pruning: Optional[PruningStat] = None
+ bucket_pruning: Optional[PruningStat] = None
+ file_skipping: Optional[PruningStat] = None
+
+ # File-level aggregates over final splits
+ file_count: int = 0
+ total_file_size: int = 0
+ estimated_row_count: int = 0
+ estimated_merged_row_count: Optional[int] = None
+ deletion_file_count: int = 0
+ level_histogram: Dict[int, int] = field(default_factory=dict)
+
+ # Split-level aggregates (shown in compact mode too)
+ split_count: int = 0
+ splits_raw_convertible: int = 0
+ splits_with_deletion_vectors: int = 0
+ splits_all_above_l0: int = 0
+ files_per_split_min: int = 0
+ files_per_split_max: int = 0
+ files_per_split_avg: float = 0.0
+ split_size_min: int = 0
+ split_size_max: int = 0
+ split_size_avg: float = 0.0
+ split_size_p50: int = 0
+ split_size_p95: int = 0
+
+ # Verbose-only
+ splits: Optional[List[ExplainSplitInfo]] = None
+
+ def __str__(self) -> str:
+ return render_explain(self)
+
+
+# ---------------------------------------------------------------------------
+# Pretty-print helpers
+# ---------------------------------------------------------------------------
+
+def render_explain(result: ExplainResult) -> str:
+ out = io.StringIO()
+ out.write("== PyPaimon Scan Plan ==\n")
+
+ flags = []
+ flags.append("PK" if result.is_primary_key_table else "Append")
+ flags.append(result.bucket_mode)
+ if result.deletion_vectors_enabled:
+ flags.append("dv=on")
+ if result.data_evolution_enabled:
+ flags.append("data-evolution=on")
+ _line(out, "Table", "{} ({})".format(result.table_identifier, ",
".join(flags)))
+
+ if result.snapshot_id is None:
+ _line(out, "Snapshot", "<none> (table is empty or has no snapshot)")
+ else:
+ schema_part = "" if result.schema_id is None else " (schema
{})".format(result.schema_id)
+ _line(out, "Snapshot", "{}{}".format(result.snapshot_id, schema_part))
+
+ _line(out, "Predicate", result.predicate if result.predicate else "<none>")
+ _line(out, "Projection",
+ "[{}]".format(", ".join(result.projection)) if result.projection
else "<all columns>")
+ _line(out, "Limit", str(result.limit) if result.limit is not None else
"<none>")
+
+ out.write("\n")
+ _line(out, "Partition pruning",
+ result.partition_pruning.format() if result.partition_pruning else
"n/a")
+ _line(out, "Bucket pruning",
+ result.bucket_pruning.format() if result.bucket_pruning else "n/a")
+ _line(out, "File skipping",
+ result.file_skipping.format() if result.file_skipping else "n/a")
+
+ out.write("\n")
+ _line(out, "Splits", str(result.split_count))
+ if result.split_count > 0:
+ out.write(" raw-convertible: {} / {}\n".format(
+ result.splits_raw_convertible, result.split_count))
+ out.write(" with DV: {} / {}\n".format(
+ result.splits_with_deletion_vectors, result.split_count))
+ out.write(" all-above-L0: {} / {}\n".format(
+ result.splits_all_above_l0, result.split_count))
+ out.write(" files/split: min={} max={} avg={:.2f}\n".format(
+ result.files_per_split_min,
+ result.files_per_split_max,
+ result.files_per_split_avg))
+ out.write(" size/split: min={} p50={} p95={}
max={}\n".format(
+ _format_size(result.split_size_min),
+ _format_size(result.split_size_p50),
+ _format_size(result.split_size_p95),
+ _format_size(result.split_size_max)))
+
+ out.write("\n")
+ _line(out, "Files", str(result.file_count))
+ _line(out, "Total size", _format_size(result.total_file_size))
+
+ rows = "{:,}".format(result.estimated_row_count)
+ if result.estimated_merged_row_count is not None:
+ rows += " (merged: {:,})".format(result.estimated_merged_row_count)
+ _line(out, "Estimated rows", rows)
+
+ if result.level_histogram:
+ levels = sorted(result.level_histogram.items())
+ levels_str = " ".join("L{}={}".format(lv, cnt) for lv, cnt in levels)
+ _line(out, "Level histogram", levels_str)
+ _line(out, "Deletion files", str(result.deletion_file_count))
+
+ if result.splits:
+ out.write("\nSplits[]\n")
+ for idx, split in enumerate(result.splits):
+ out.write(" [{}] partition={} bucket={} files={} size={} rows={}
raw={} dv={}\n".format(
+ idx,
+ split.partition,
+ split.bucket,
+ split.file_count,
+ _format_size(split.file_size),
+ split.row_count,
+ split.raw_convertible,
+ split.has_deletion_vectors,
+ ))
+ if split.level_histogram:
+ levels = sorted(split.level_histogram.items())
+ out.write(" levels: {}\n".format(
+ " ".join("L{}={}".format(lv, cnt) for lv, cnt in levels)))
+ for path in split.file_paths:
+ out.write(" file: {}\n".format(path))
+
+ return out.getvalue().rstrip("\n")
+
+
+def _line(out: io.StringIO, label: str, value: str) -> None:
+ out.write("{:<19} {}\n".format(label + ":", value))
+
+
+_SIZE_UNITS = ("B", "KiB", "MiB", "GiB", "TiB", "PiB")
+
+
+def _format_size(num_bytes: int) -> str:
+ if num_bytes is None:
+ return "?"
+ size = float(num_bytes)
+ for unit in _SIZE_UNITS:
+ if size < 1024.0 or unit == _SIZE_UNITS[-1]:
+ if unit == "B":
+ return "{:d} {}".format(int(size), unit)
+ return "{:.1f} {}".format(size, unit)
+ size /= 1024.0
+ return "{:.1f} {}".format(size, _SIZE_UNITS[-1])
diff --git a/paimon-python/pypaimon/read/explain_render.py
b/paimon-python/pypaimon/read/explain_render.py
new file mode 100644
index 0000000000..2e09da842a
--- /dev/null
+++ b/paimon-python/pypaimon/read/explain_render.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any
+
+from pypaimon.common.predicate import Predicate
+
+
+_BINARY_OPS = {
+ 'equal': '=',
+ 'notEqual': '!=',
+ 'lessThan': '<',
+ 'lessOrEqual': '<=',
+ 'greaterThan': '>',
+ 'greaterOrEqual': '>=',
+}
+
+
+def render_predicate(predicate: Predicate) -> str:
+ """Render a :class:`Predicate` tree as a human-readable string.
+
+ The renderer relies only on the existing ``method`` / ``field`` /
+ ``literals`` shape and never mutates the predicate. Used by
+ ``ReadBuilder.explain()`` and intentionally lives outside
+ :mod:`pypaimon.common.predicate` so the predicate module stays
+ rendering-agnostic.
+ """
+ if predicate is None:
+ return ""
+ method = predicate.method
+ field = predicate.field
+ literals = predicate.literals
+
+ if method == 'and':
+ return _join_children(literals, 'AND')
+ if method == 'or':
+ return _join_children(literals, 'OR')
+ if method in _BINARY_OPS:
+ return "{} {} {}".format(field, _BINARY_OPS[method],
_format_literal(literals[0]))
+ if method == 'in':
+ return "{} IN [{}]".format(field, ", ".join(_format_literal(v) for v
in literals))
+ if method == 'notIn':
+ return "{} NOT IN [{}]".format(field, ", ".join(_format_literal(v) for
v in literals))
+ if method == 'between':
+ return "{} BETWEEN {} AND {}".format(
+ field, _format_literal(literals[0]), _format_literal(literals[1]))
+ if method == 'notBetween':
+ return "{} NOT BETWEEN {} AND {}".format(
+ field, _format_literal(literals[0]), _format_literal(literals[1]))
+ if method == 'isNull':
+ return "{} IS NULL".format(field)
+ if method == 'isNotNull':
+ return "{} IS NOT NULL".format(field)
+ if method == 'startsWith':
+ return "{} STARTSWITH {}".format(field, _format_literal(literals[0]))
+ if method == 'endsWith':
+ return "{} ENDSWITH {}".format(field, _format_literal(literals[0]))
+ if method == 'contains':
+ return "{} CONTAINS {}".format(field, _format_literal(literals[0]))
+ if method == 'like':
+ return "{} LIKE {}".format(field, _format_literal(literals[0]))
+ return "{}({}{})".format(
+ method,
+ field if field is not None else "",
+ ", " + ", ".join(_format_literal(v) for v in (literals or [])) if
literals else "",
+ )
+
+
+def _join_children(children, joiner: str) -> str:
+ parts = [render_predicate(c) for c in (children or [])]
+ parts = [p for p in parts if p]
+ if not parts:
+ return ""
+ if len(parts) == 1:
+ return parts[0]
+ return " {} ".format(joiner).join("({})".format(p) for p in parts)
+
+
+def _format_literal(value: Any) -> str:
+ if value is None:
+ return "NULL"
+ if isinstance(value, str):
+ return "'{}'".format(value.replace("'", "\\'"))
+ if isinstance(value, bytes):
+ return "b'{}'".format(value.hex())
+ return repr(value)
diff --git a/paimon-python/pypaimon/read/read_builder.py
b/paimon-python/pypaimon/read/read_builder.py
index 13a951df0b..51233856f2 100644
--- a/paimon-python/pypaimon/read/read_builder.py
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -19,6 +19,10 @@ from typing import List, Optional
from pypaimon.common.predicate import Predicate
from pypaimon.common.predicate_builder import PredicateBuilder
+from pypaimon.read.explain import ExplainResult, ExplainSplitInfo, PruningStat
+from pypaimon.read.explain_render import render_predicate
+from pypaimon.read.scan_stats import ScanStats
+from pypaimon.read.split import Split
from pypaimon.read.table_read import TableRead
from pypaimon.read.table_scan import TableScan
from pypaimon.schema.data_types import DataField
@@ -97,6 +101,41 @@ class ReadBuilder:
def new_predicate_builder(self) -> PredicateBuilder:
return PredicateBuilder(self.read_type())
+ # TODO: surface this through pypaimon's CLI (alongside cli_sql /
+ # cli_table) so users can run `pypaimon explain ...` against a table
+ # without writing any Python.
+ def explain(self, verbose: bool = False) -> ExplainResult:
+ """Produce a structured scan plan for this builder.
+
+ Runs one planning pass (manifest list + manifest reads, no data
+ files) and returns an :class:`ExplainResult` summarising the
+ target snapshot, the pushed-down predicate / projection / limit,
+ the partition / bucket / file-stats pruning funnel, and split-
+ level execution signals (raw-convertible ratio, deletion-vector
+ ratio, level histogram, files-per-split and split-size
+ distribution). With ``verbose=True``, every split is listed.
+
+ Cost: ``explain()`` reads manifest list + manifests but never
+ opens data files. To produce accurate before/after counters it
+ suppresses the manifest-reader's early bucket filter and forces
+ single-threaded manifest decoding, so it can be measurably
+ heavier than a regular ``new_scan().plan()`` on tables where the
+ early filter usually prunes aggressively (e.g. very wide
+ HASH_FIXED tables with a tight predicate).
+ """
+ scan = self.new_scan()
+ plan, stats = scan.scan_with_stats()
+ return _build_explain_result(
+ table=self.table,
+ scan=scan,
+ plan=plan,
+ stats=stats,
+ predicate=self._predicate,
+ projection=self._projection,
+ limit=self._limit,
+ verbose=verbose,
+ )
+
def read_type(self) -> List[DataField]:
table_fields = self.table.fields
@@ -158,3 +197,179 @@ class ReadBuilder:
if ok:
paths.append(path)
return paths
+
+
+def _build_explain_result(table, scan: TableScan, plan, stats: ScanStats,
+ predicate, projection, limit, verbose: bool) ->
ExplainResult:
+ """Translate one (Plan, ScanStats) pair into an ExplainResult."""
+ splits: List[Split] = plan.splits()
+
+ table_schema = table.table_schema
+ bucket_mode_str = _safe_bucket_mode(table)
+
+ partition_pruning = _partition_pruning(stats, scan)
+ bucket_pruning = _bucket_pruning(stats, scan)
+ file_skipping = _file_skipping(stats, scan)
+
+ files_per_split = [len(getattr(s, 'files', []) or []) for s in splits]
+ sizes = [int(getattr(s, 'file_size', 0) or 0) for s in splits]
+
+ rows_total = sum(int(getattr(s, 'row_count', 0) or 0) for s in splits)
+ merged_per_split = [s.merged_row_count() for s in splits]
+ if splits and all(v is not None for v in merged_per_split):
+ merged_total: Optional[int] = sum(merged_per_split)
+ else:
+ merged_total = None
+
+ file_count = sum(files_per_split)
+ total_size = sum(sizes)
+ level_hist: dict = {}
+ deletion_file_total = 0
+ splits_raw_convertible = 0
+ splits_with_dv = 0
+ splits_all_above_l0 = 0
+ split_infos: List[ExplainSplitInfo] = []
+
+ for split in splits:
+ files = getattr(split, 'files', []) or []
+ per_split_levels: dict = {}
+ for f in files:
+ lv = getattr(f, 'level', 0) or 0
+ level_hist[lv] = level_hist.get(lv, 0) + 1
+ per_split_levels[lv] = per_split_levels.get(lv, 0) + 1
+ dvs = getattr(split, 'data_deletion_files', None) or []
+ dv_count_here = sum(1 for d in dvs if d is not None)
+ deletion_file_total += dv_count_here
+ has_dv = dv_count_here > 0
+ raw = bool(getattr(split, 'raw_convertible', False))
+ if raw:
+ splits_raw_convertible += 1
+ if has_dv:
+ splits_with_dv += 1
+ if files and all((getattr(f, 'level', 0) or 0) > 0 for f in files):
+ splits_all_above_l0 += 1
+
+ if verbose:
+ split_infos.append(ExplainSplitInfo(
+ partition=_format_partition(split, table),
+ bucket=int(getattr(split, 'bucket', -1)),
+ file_count=len(files),
+ row_count=int(getattr(split, 'row_count', 0) or 0),
+ merged_row_count=split.merged_row_count(),
+ file_size=int(getattr(split, 'file_size', 0) or 0),
+ raw_convertible=raw,
+ has_deletion_vectors=has_dv,
+ level_histogram=per_split_levels,
+ deletion_file_count=dv_count_here,
+ file_paths=list(getattr(split, 'file_paths', []) or []),
+ ))
+
+ fps_min, fps_max, fps_avg = _min_max_avg(files_per_split)
+ sz_min, sz_max, sz_avg = _min_max_avg(sizes)
+ sz_p50 = _percentile(sizes, 50)
+ sz_p95 = _percentile(sizes, 95)
+
+ return ExplainResult(
+ table_identifier=str(table.identifier.get_full_name()),
+ is_primary_key_table=bool(table.is_primary_key_table),
+ bucket_mode=bucket_mode_str,
+
deletion_vectors_enabled=bool(table.options.deletion_vectors_enabled()),
+ data_evolution_enabled=bool(table.options.data_evolution_enabled()),
+ snapshot_id=plan.snapshot_id,
+ schema_id=table_schema.id if plan.snapshot_id is not None else None,
+ predicate=render_predicate(predicate) if predicate is not None else
None,
+ projection=list(projection) if projection else None,
+ limit=limit,
+ partition_pruning=partition_pruning,
+ bucket_pruning=bucket_pruning,
+ file_skipping=file_skipping,
+ file_count=file_count,
+ total_file_size=total_size,
+ estimated_row_count=rows_total,
+ estimated_merged_row_count=merged_total,
+ deletion_file_count=deletion_file_total,
+ level_histogram=level_hist,
+ split_count=len(splits),
+ splits_raw_convertible=splits_raw_convertible,
+ splits_with_deletion_vectors=splits_with_dv,
+ splits_all_above_l0=splits_all_above_l0,
+ files_per_split_min=fps_min,
+ files_per_split_max=fps_max,
+ files_per_split_avg=fps_avg,
+ split_size_min=sz_min,
+ split_size_max=sz_max,
+ split_size_avg=sz_avg,
+ split_size_p50=sz_p50,
+ split_size_p95=sz_p95,
+ splits=split_infos if verbose else None,
+ )
+
+
+def _partition_pruning(stats: ScanStats, scan: TableScan) ->
Optional[PruningStat]:
+ if scan.predicate is None:
+ return None
+ table_partition_keys = scan.table.partition_keys or []
+ if not table_partition_keys:
+ return None
+ # ``entries_potential_total`` is the count from manifest-file metadata
+ # (manifest-level pruning has not been applied yet). The "after" side
+ # is everything that survived both manifest-stats and per-entry
+ # partition filters.
+ return PruningStat(
+ before=stats.entries_potential_total,
+ after=stats.entries_after_partition,
+ )
+
+
+def _bucket_pruning(stats: ScanStats, scan: TableScan) ->
Optional[PruningStat]:
+ # Visible whenever the scan applies any bucket-level filtering — the
+ # HASH_FIXED predicate-driven selector OR the POSTPONE_BUCKET
+ # synthetic-bucket skip. Tables with neither (e.g. BUCKET_UNAWARE
+ # append) leave this counter as ``None``.
+ fs = scan.file_scanner
+ if fs._bucket_selector is None and not fs.only_read_real_buckets:
+ return None
+ return PruningStat(before=stats.entries_after_partition,
after=stats.entries_after_bucket)
+
+
+def _file_skipping(stats: ScanStats, scan: TableScan) -> Optional[PruningStat]:
+ # Captures the funnel between bucket-stage survivors and the entries
+ # that actually feed the split generator. The drop here includes both
+ # predicate-driven file-stats pruning AND structural skips that fire
+ # in ``_filter_manifest_entry`` once a file is fully decoded (most
+ # notably the "do not read level-0 file" rule for DV-enabled PK
+ # tables, which is an LSM-shape decision rather than a predicate
+ # test).
+ if scan.predicate is None:
+ return None
+ return PruningStat(before=stats.entries_after_bucket,
after=stats.entries_after_stats)
+
+
+def _safe_bucket_mode(table) -> str:
+ try:
+ return table.bucket_mode().name
+ except Exception:
+ return "UNKNOWN"
+
+
+def _format_partition(split, table) -> dict:
+ keys = list(table.partition_keys or [])
+ partition = getattr(split, 'partition', None)
+ if partition is None or not keys:
+ return {}
+ values = getattr(partition, 'values', None) or []
+ return {k: v for k, v in zip(keys, values)}
+
+
+def _min_max_avg(values):
+ if not values:
+ return 0, 0, 0.0
+ return min(values), max(values), sum(values) / float(len(values))
+
+
+def _percentile(values, pct: int) -> int:
+ if not values:
+ return 0
+ ordered = sorted(values)
+ idx = int(round((pct / 100.0) * (len(ordered) - 1)))
+ return int(ordered[idx])
diff --git a/paimon-python/pypaimon/read/scan_stats.py
b/paimon-python/pypaimon/read/scan_stats.py
new file mode 100644
index 0000000000..6626b0c240
--- /dev/null
+++ b/paimon-python/pypaimon/read/scan_stats.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import Set, Tuple
+
+
+@dataclass
+class ScanStats:
+ """Counters accumulated by :class:`FileScanner` when it is asked to
+ track stats for ``ReadBuilder.explain()``.
+
+ The scanner mutates these counters in place; consumers should treat
+ instances as immutable once the scan returns. Default factory values
+ keep the dataclass usable both as a blank "no tracking" sentinel and
+ as a live accumulator.
+ """
+
+ manifest_files_total: int = 0
+ manifest_files_after_partition: int = 0
+
+ # ``entries_potential_total`` is the row count we would have processed if
no
+ # filtering occurred — derived from manifest-file metadata before the
+ # manifest-level partition skip. ``entries_total`` is what actually reached
+ # ``_filter_manifest_entry``; the gap between the two captures
+ # manifest-level pruning.
+ entries_potential_total: int = 0
+ entries_total: int = 0
+ entries_after_partition: int = 0
+ entries_after_bucket: int = 0
+ entries_after_stats: int = 0
+
+ partition_keys_before: Set[Tuple] = field(default_factory=set)
+ partition_keys_after: Set[Tuple] = field(default_factory=set)
+
+ buckets_seen: Set[Tuple[Tuple, int]] = field(default_factory=set)
+ buckets_after_pruning: Set[Tuple[Tuple, int]] = field(default_factory=set)
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index 2878d41171..1d2831c194 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -35,6 +35,7 @@ from pypaimon.read.plan import Plan
from pypaimon.read.push_down_utils import (_get_all_fields,
remove_row_id_filter,
trim_and_transform_predicate)
+from pypaimon.read.scan_stats import ScanStats
from pypaimon.read.scanner.append_table_split_generator import \
AppendTableSplitGenerator
from pypaimon.read.scanner.bucket_select_converter import \
@@ -210,6 +211,10 @@ class FileScanner:
self._global_index_result = None
self._scanned_snapshot = None
self._scanned_snapshot_id = None
+ # Opt-in scan-plan tracking. Stays ``None`` for the read hot path;
+ # ``scan_with_stats()`` flips it on for a single explain pass and
+ # the filter callbacks below increment counters when present.
+ self.scan_stats: Optional[ScanStats] = None
# Predicate-driven bucket pruning (HASH_FIXED only). Mirrors Java
# BucketSelectConverter. Set on demand and reused across all
@@ -345,7 +350,20 @@ class FileScanner:
def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) ->
List[ManifestEntry]:
max_workers =
self.table.options.scan_manifest_parallelism(os.cpu_count() or 8)
+ if self.scan_stats is not None:
+ self.scan_stats.manifest_files_total += len(manifest_files)
+ # ``num_added_files + num_deleted_files`` is the entry count
+ # recorded in the manifest-file metadata; combined across
+ # all input manifest files this is the partition-prune-free
+ # baseline. The difference against ``entries_total`` reveals
+ # how much manifest-level pruning saved.
+ self.scan_stats.entries_potential_total += sum(
+ f.num_added_files + f.num_deleted_files for f in
manifest_files)
manifest_files = [entry for entry in manifest_files if
self._filter_manifest_file(entry)]
+ if self.scan_stats is not None:
+ self.scan_stats.manifest_files_after_partition +=
len(manifest_files)
+ # Force single-threaded so we can mutate stats without locking.
+ max_workers = 1
return self.manifest_file_manager.read_entries_parallel(
manifest_files,
self._filter_manifest_entry,
@@ -364,6 +382,11 @@ class FileScanner:
still happens later in ``_filter_manifest_entry`` once the entry
is fully decoded.
"""
+ # explain() needs accurate before/after pruning counters; suppress
+ # the early bucket filter so every entry reaches
+ # ``_filter_manifest_entry`` where each rejection stage is counted.
+ if self.scan_stats is not None:
+ return None
only_real = self.only_read_real_buckets
selector = self._bucket_selector
if not only_real and selector is None:
@@ -402,6 +425,19 @@ class FileScanner:
self._global_index_result = result
return self
+ def scan_with_stats(self) -> Tuple[Plan, ScanStats]:
+ """Run one scan pass while recording :class:`ScanStats` counters.
+
+ Side-effects: forces single-thread manifest reads and disables the
+ early bucket filter so every entry reaches
+ ``_filter_manifest_entry`` exactly once. The scanner is one-shot
+ in this mode — call ``scan()`` on a fresh instance afterwards if
+ you need the regular hot path.
+ """
+ self.scan_stats = ScanStats()
+ plan = self.scan()
+ return plan, self.scan_stats
+
def _apply_push_down_limit(self, splits: List[DataSplit]) ->
List[DataSplit]:
"""Mirror Java ``DataTableBatchScan.applyPushDownLimit``: sum the
DV-aware ``merged_row_count`` (== Java ``Split.mergedRowCount()``)
@@ -498,9 +534,30 @@ class FileScanner:
)
def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
- # Redundant safety net: the early filter in the manifest reader
- # already enforces these, but guard here too so this method is
- # self-contained if called outside read_entries_parallel.
+ stats = self.scan_stats
+ if stats is not None:
+ stats.entries_total += 1
+ partition_key = tuple(entry.partition.values)
+ stats.partition_keys_before.add(partition_key)
+ stats.buckets_seen.add((partition_key, entry.bucket))
+ # Stage 1: partition predicate. The early manifest-reader filter
+ # only sees ``(bucket, total_buckets)`` and never enforces
+ # partition predicates, so this check is the sole partition gate
+ # at the entry level — not a "redundant safety net".
+ if self.partition_key_predicate and not
self.partition_key_predicate.test(entry.partition):
+ return False
+ if stats is not None:
+ stats.entries_after_partition += 1
+ stats.partition_keys_after.add(partition_key)
+ # Stage 2: bucket rejection. Two reasons land here:
+ # * ``only_read_real_buckets`` drops the synthetic
+ # POSTPONE_BUCKET bucket id (also enforced by the early
+ # filter when present; kept here so the method is correct
+ # standalone).
+ # * ``_bucket_selector`` is the HASH_FIXED predicate-driven
+ # selector built by ``_init_bucket_selector``.
+ # Both are accounted for under ``entries_after_bucket`` so the
+ # explain funnel reports bucket-level pruning end-to-end.
if self.only_read_real_buckets and entry.bucket < 0:
return False
if (self._bucket_selector is not None
@@ -508,8 +565,9 @@ class FileScanner:
and not self._bucket_selector(
entry.partition, entry.bucket, entry.total_buckets)):
return False
- if self.partition_key_predicate and not
self.partition_key_predicate.test(entry.partition):
- return False
+ if stats is not None:
+ stats.entries_after_bucket += 1
+ stats.buckets_after_pruning.add((partition_key, entry.bucket))
# Get SimpleStatsEvolution for this schema
evolution =
self.simple_stats_evolutions.get_or_create(entry.file.schema_id)
@@ -540,14 +598,18 @@ class FileScanner:
entry.file.row_count
):
return False
+ if stats is not None:
+ stats.entries_after_stats += 1
return True
else:
- if not self.predicate:
- return True
- if self.predicate_for_stats is None:
+ if not self.predicate or self.predicate_for_stats is None:
+ if stats is not None:
+ stats.entries_after_stats += 1
return True
# Data evolution: file stats may be from another schema, skip
stats filter and filter in reader.
if self.data_evolution:
+ if stats is not None:
+ stats.entries_after_stats += 1
return True
if entry.file.value_stats_cols is None and entry.file.write_cols
is not None:
stats_fields = entry.file.write_cols
@@ -558,10 +620,13 @@ class FileScanner:
entry.file.row_count,
stats_fields
)
- return self.predicate_for_stats.test_by_simple_stats(
+ kept = self.predicate_for_stats.test_by_simple_stats(
evolved_stats,
entry.file.row_count
)
+ if kept and stats is not None:
+ stats.entries_after_stats += 1
+ return kept
def _scan_dv_index(self, snapshot, buckets: Set[tuple]) -> Dict[tuple,
Dict[str, DeletionFile]]:
"""
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index d4ac6cfe16..bc610134e0 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -15,12 +15,13 @@
# specific language governing permissions and limitations
# under the License.
-from typing import Optional
+from typing import Optional, Tuple
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
from pypaimon.read.plan import Plan
+from pypaimon.read.scan_stats import ScanStats
from pypaimon.read.scanner.file_scanner import FileScanner
from pypaimon.manifest.manifest_list_manager import ManifestListManager
@@ -44,6 +45,14 @@ class TableScan:
def plan(self) -> Plan:
return self.file_scanner.scan()
+ def scan_with_stats(self) -> Tuple[Plan, ScanStats]:
+ """Run :meth:`plan` while recording manifest / pruning counters.
+
+ Only used by :meth:`ReadBuilder.explain`; the regular read path
+ keeps going through :meth:`plan`.
+ """
+ return self.file_scanner.scan_with_stats()
+
def _create_file_scanner(self) -> FileScanner:
options = self.table.options.options
snapshot_manager = self.table.snapshot_manager()
diff --git a/paimon-python/pypaimon/tests/read_builder_explain_test.py
b/paimon-python/pypaimon/tests/read_builder_explain_test.py
new file mode 100644
index 0000000000..82b0f0387f
--- /dev/null
+++ b/paimon-python/pypaimon/tests/read_builder_explain_test.py
@@ -0,0 +1,270 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Tests for ``ReadBuilder.explain``: pruning funnel counters,
+split-level execution signals, and pretty-print smoke."""
+
+import os
+import shutil
+import tempfile
+import unittest
+from typing import Any, Dict, List
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.common.predicate import Predicate
+from pypaimon.read.explain import ExplainResult
+from pypaimon.read.explain_render import render_predicate
+
+
+def _write(table, rows: List[Dict], pa_schema: pa.Schema) -> None:
+ wb = table.new_batch_write_builder()
+ w = wb.new_write()
+ c = wb.new_commit()
+ try:
+ w.write_arrow(pa.Table.from_pylist(rows, schema=pa_schema))
+ c.commit(w.prepare_commit())
+ finally:
+ w.close()
+ c.close()
+
+
+class ReadBuilderExplainTest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', False)
+
+ @classmethod
+ def tearDownClass(cls):
+ shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+ # ---- helpers --------------------------------------------------------
+
+ def _append_table(self, name: str) -> Any:
+ pa_schema = pa.schema([
+ pa.field('id', pa.int64(), nullable=False),
+ ('val', pa.int64()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={'bucket': '-1', 'file.format': 'parquet'},
+ )
+ full = 'default.{}'.format(name)
+ self.catalog.create_table(full, schema, False)
+ return self.catalog.get_table(full), pa_schema
+
+ def _pk_partitioned_bucketed_table(self, name: str, num_buckets: int = 4)
-> Any:
+ pa_schema = pa.schema([
+ pa.field('dt', pa.string(), nullable=False),
+ pa.field('id', pa.int64(), nullable=False),
+ ('val', pa.int64()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ partition_keys=['dt'],
+ primary_keys=['dt', 'id'],
+ options={
+ 'bucket': str(num_buckets),
+ 'bucket-key': 'id',
+ 'file.format': 'parquet',
+ },
+ )
+ full = 'default.{}'.format(name)
+ self.catalog.create_table(full, schema, False)
+ return self.catalog.get_table(full), pa_schema
+
+ def _pk_dv_table(self, name: str, num_buckets: int = 2) -> Any:
+ pa_schema = pa.schema([
+ pa.field('id', pa.int64(), nullable=False),
+ ('val', pa.int64()),
+ ])
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ primary_keys=['id'],
+ options={
+ 'bucket': str(num_buckets),
+ 'file.format': 'parquet',
+ 'deletion-vectors.enabled': 'true',
+ },
+ )
+ full = 'default.{}'.format(name)
+ self.catalog.create_table(full, schema, False)
+ return self.catalog.get_table(full), pa_schema
+
+ # ---- 1. append-only baseline ---------------------------------------
+
+ def test_explain_append_only_no_predicate(self):
+ table, pa_schema = self._append_table('explain_basic')
+ _write(table, [{'id': i, 'val': i * 2} for i in range(20)], pa_schema)
+ _write(table, [{'id': i, 'val': i * 3} for i in range(20, 40)],
pa_schema)
+
+ rb = table.new_read_builder()
+ result = rb.explain()
+
+ plan_splits = rb.new_scan().plan().splits()
+ self.assertEqual(result.split_count, len(plan_splits))
+ self.assertGreater(result.file_count, 0)
+ self.assertGreater(result.total_file_size, 0)
+ self.assertIsNone(result.partition_pruning)
+ self.assertIsNone(result.bucket_pruning)
+ self.assertIsNone(result.file_skipping)
+ self.assertIsNone(result.predicate)
+
+ # ---- 2. PK + partition + bucket pruning ----------------------------
+
+ def test_explain_pk_table_with_partition_and_bucket_predicate(self):
+ table, pa_schema = self._pk_partitioned_bucketed_table(
+ 'explain_pk_pruning', num_buckets=4)
+ for day in ['2026-05-01', '2026-05-02', '2026-05-03', '2026-05-04']:
+ _write(
+ table,
+ [{'dt': day, 'id': i, 'val': i + 1} for i in range(20)],
+ pa_schema,
+ )
+
+ rb = table.new_read_builder()
+ pb = rb.new_predicate_builder()
+ pred = pb.and_predicates([
+ pb.equal('dt', '2026-05-01'),
+ pb.equal('id', 7),
+ ])
+ rb = rb.with_filter(pred)
+ result = rb.explain()
+
+ self.assertIsNotNone(result.partition_pruning)
+ self.assertIsNotNone(result.bucket_pruning)
+ self.assertIsNotNone(result.file_skipping)
+ self.assertGreater(
+ result.partition_pruning.before, result.partition_pruning.after,
+ "partition predicate must drop at least one entry")
+ self.assertGreater(
+ result.bucket_pruning.before, result.bucket_pruning.after,
+ "HASH_FIXED bucket pruning must drop at least one entry")
+
+ # Cross-check against the actual scan
+ plan_splits = rb.new_scan().plan().splits()
+ self.assertEqual(result.split_count, len(plan_splits))
+
+ # ---- 3. predicate rendering ----------------------------------------
+
+ def test_render_predicate_shapes(self):
+ eq = Predicate(method='equal', index=0, field='dt',
literals=['2026-05-01'])
+ self.assertEqual(render_predicate(eq), "dt = '2026-05-01'")
+
+ in_p = Predicate(method='in', index=1, field='id', literals=[1, 2, 3])
+ self.assertEqual(render_predicate(in_p), "id IN [1, 2, 3]")
+
+ between = Predicate(method='between', index=1, field='id',
literals=[5, 10])
+ self.assertEqual(render_predicate(between), "id BETWEEN 5 AND 10")
+
+ is_null = Predicate(method='isNull', index=0, field='val',
literals=None)
+ self.assertEqual(render_predicate(is_null), "val IS NULL")
+
+ and_p = Predicate(method='and', index=None, field=None, literals=[eq,
in_p])
+ self.assertEqual(
+ render_predicate(and_p),
+ "(dt = '2026-05-01') AND (id IN [1, 2, 3])")
+
+ or_p = Predicate(method='or', index=None, field=None,
literals=[between, is_null])
+ self.assertEqual(
+ render_predicate(or_p),
+ "(id BETWEEN 5 AND 10) OR (val IS NULL)")
+
+ # ---- 4. verbose split detail ---------------------------------------
+
+ def test_explain_verbose_lists_per_split_detail(self):
+ table, pa_schema = self._append_table('explain_verbose')
+ _write(table, [{'id': i, 'val': i} for i in range(50)], pa_schema)
+ _write(table, [{'id': i, 'val': i} for i in range(50, 100)], pa_schema)
+
+ rb = table.new_read_builder()
+ result = rb.explain(verbose=True)
+ self.assertIsNotNone(result.splits)
+ self.assertEqual(len(result.splits), result.split_count)
+
+ plan_splits = rb.new_scan().plan().splits()
+ for explained, actual in zip(result.splits, plan_splits):
+ self.assertEqual(explained.bucket, actual.bucket)
+ self.assertEqual(explained.file_count, len(actual.files))
+ self.assertEqual(set(explained.file_paths), set(actual.file_paths))
+
+ # ---- 5. empty snapshot path ----------------------------------------
+
+ def test_explain_empty_snapshot(self):
+ table, _ = self._append_table('explain_empty')
+
+ rb = table.new_read_builder()
+ result = rb.explain()
+ self.assertIsNone(result.snapshot_id)
+ self.assertEqual(result.split_count, 0)
+ self.assertEqual(result.file_count, 0)
+ self.assertIn("<none>", str(result))
+
+ # ---- 6. split-level metrics: DV vs append-only ---------------------
+
+ def test_explain_split_level_metrics(self):
+ # Append-only: every split is raw-convertible, no deletion vectors.
+ table, pa_schema = self._append_table('explain_split_signals_ap')
+ _write(table, [{'id': i, 'val': i} for i in range(30)], pa_schema)
+ _write(table, [{'id': i, 'val': i} for i in range(30, 60)], pa_schema)
+ result = table.new_read_builder().explain()
+ self.assertGreater(result.split_count, 0)
+ self.assertEqual(result.splits_with_deletion_vectors, 0)
+ self.assertEqual(result.splits_raw_convertible, result.split_count)
+ # All written files are at L0, and append-only doesn't filter them.
+ self.assertIn(0, result.level_histogram)
+ self.assertEqual(result.splits_all_above_l0, 0)
+
+ # DV-enabled PK table: pypaimon writes alone don't trigger compaction,
+ # so every file stays at L0. ``_filter_manifest_entry`` then drops
+ # them all, leaving an empty plan. The vacuous "all above L0"
+ # invariant (0 / 0) still has to hold.
+ dv_table, dv_pa = self._pk_dv_table('explain_split_signals_dv',
num_buckets=2)
+ _write(dv_table, [{'id': i, 'val': i} for i in range(20)], dv_pa)
+ _write(dv_table, [{'id': i, 'val': i * 10} for i in range(10)], dv_pa)
+ dv_result = dv_table.new_read_builder().explain()
+ self.assertEqual(dv_result.split_count, 0)
+ self.assertEqual(dv_result.splits_all_above_l0, 0)
+
+ # ---- 7. pretty-print smoke -----------------------------------------
+
+ def test_pretty_print_smoke(self):
+ table, pa_schema = self._append_table('explain_print_smoke')
+ _write(table, [{'id': i, 'val': i} for i in range(40)], pa_schema)
+
+ result = table.new_read_builder().explain()
+ self.assertIsInstance(result, ExplainResult)
+ printed = str(result)
+ for anchor in (
+ "Snapshot:",
+ "Splits:",
+ "raw-convertible:",
+ "with DV:",
+ "size/split:",
+ "Files:",
+ "Total size:",
+ ):
+ self.assertIn(anchor, printed, "missing anchor: " + anchor)
+
+
+if __name__ == '__main__':
+ unittest.main()