This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 69d9bf3b26 [python] Add ReadBuilder.explain() for scan-plan visibility 
(#7869)
69d9bf3b26 is described below

commit 69d9bf3b264ab713136ddd23acacf48a1a256b43
Author: chaoyang <[email protected]>
AuthorDate: Tue May 19 15:28:11 2026 +0800

    [python] Add ReadBuilder.explain() for scan-plan visibility (#7869)
    
    Add `ReadBuilder.explain()` returning a structured `ExplainResult` so
    users
    can see what a PyPaimon read will actually do — target snapshot,
    pushed-down
    predicate / projection / limit, partition / bucket / file-stats pruning
    funnel, and split-level execution signals (raw-convertible ratio,
    deletion-
    vector ratio, level histogram, split-size skew).
    
    The default `__str__` is a compact debug layout; `verbose=True` lists
    every
    split. Reads manifest list + manifests only — data files are never
    opened.
---
 docs/content/pypaimon/python-api.md                |  66 +++++
 paimon-python/pypaimon/read/explain.py             | 241 ++++++++++++++++++
 paimon-python/pypaimon/read/explain_render.py      | 100 ++++++++
 paimon-python/pypaimon/read/read_builder.py        | 215 ++++++++++++++++
 paimon-python/pypaimon/read/scan_stats.py          |  51 ++++
 .../pypaimon/read/scanner/file_scanner.py          |  83 ++++++-
 paimon-python/pypaimon/read/table_scan.py          |  11 +-
 .../pypaimon/tests/read_builder_explain_test.py    | 270 +++++++++++++++++++++
 8 files changed, 1027 insertions(+), 10 deletions(-)

diff --git a/docs/content/pypaimon/python-api.md 
b/docs/content/pypaimon/python-api.md
index 50a0117891..e83e1fa506 100644
--- a/docs/content/pypaimon/python-api.md
+++ b/docs/content/pypaimon/python-api.md
@@ -594,6 +594,72 @@ Key points about shard read:
 - **Parallel Processing**: Each shard can be processed independently for 
better performance
 - **Consistency**: Combining all shards should produce the complete table data
 
+### Explain Scan Plan
+
+`ReadBuilder.explain()` returns a structured view of the scan plan without 
reading any data files. It is useful for understanding which splits a query 
will produce, how aggressively the pushdown pruned the input, and whether the 
resulting splits can be read on the zero-copy fast path.
+
+```python
+table = catalog.get_table('default.events')
+read_builder = table.new_read_builder()
+predicate_builder = read_builder.new_predicate_builder()
+read_builder = read_builder.with_filter(predicate_builder.equal('dt', 
'2026-05-16'))
+print(read_builder.explain())
+
+# == PyPaimon Scan Plan ==
+# Table:              default.events (PK, HASH_FIXED)
+# Snapshot:           1  (schema 0)
+# Predicate:          dt = '2026-05-16'
+# Projection:         <all columns>
+# Limit:              <none>
+#
+# Partition pruning:  12 -> 4  (pruned 8)
+# Bucket pruning:     4 -> 4  (pruned 0)
+# File skipping:      4 -> 4  (pruned 0)
+#
+# Splits:             4
+#   raw-convertible:  4 / 4
+#   with DV:          0 / 4
+#   all-above-L0:     0 / 4
+#   files/split:      min=1  max=1  avg=1.00
+#   size/split:       min=2.8 KiB  p50=2.9 KiB  p95=3.0 KiB  max=3.0 KiB
+#
+# Files:              4
+# Total size:         11.6 KiB
+# Estimated rows:     20   (merged: 20)
+# Level histogram:    L0=4
+# Deletion files:     0
+```
+
+Pass `verbose=True` to also list every split with its partition, bucket, file 
count, size, level histogram, and file paths:
+
+```python
+print(read_builder.explain(verbose=True))
+
+# ...
+#
+# Splits[]
+#   [0] partition={'dt': '2026-05-16'} bucket=3 files=1 size=2.9 KiB rows=4 
raw=True dv=False
+#       levels: L0=1
+#       file: 
/warehouse/default.db/events/dt=2026-05-16/bucket-3/data-...parquet
+#   [1] partition={'dt': '2026-05-16'} bucket=2 files=1 size=2.8 KiB rows=2 
raw=True dv=False
+#       levels: L0=1
+#       file: 
/warehouse/default.db/events/dt=2026-05-16/bucket-2/data-...parquet
+#   ...
+```
+
+What the fields tell you:
+
+- **Pushdown** (`Predicate` / `Projection` / `Limit`): exactly what the reader 
sees after `with_filter` / `with_projection` / `with_limit`.
+- **Pruning funnel** (`Partition pruning` / `Bucket pruning` / `File 
skipping`): three `before -> after` counts that show at which stage the 
predicate paid off. `n/a` means the stage did not apply — for example, bucket 
pruning is reported for HASH_FIXED tables where every bucket key is pinned by 
the predicate, and for POSTPONE_BUCKET tables that skip their synthetic-bucket 
entries.
+- **Split shape**: `raw-convertible` counts splits that can be read zero-copy 
(no merge, no deletion-vector apply); `with DV` counts splits whose files need 
a deletion vector applied; `all-above-L0` counts splits whose data lives 
entirely on L1+, i.e. the merge pipeline can skip the L0 buffer.
+- **File aggregates**: total file size + estimated rows (with the post-merge 
row estimate for primary-key tables in parentheses), plus a level histogram of 
where the data sits.
+
+{{< hint info >}}
+**Cost**: `explain()` reads the manifest list and manifest files but does not 
open any data files. It suppresses the manifest-reader's early bucket filter 
and forces single-threaded manifest decoding so the before/after counters are 
accurate. On tables where the early filter usually prunes aggressively (e.g. 
very wide HASH_FIXED tables with a tight predicate), this can make `explain()` 
measurably slower than a regular `new_scan().plan()`.
+{{< /hint >}}
+
+`ExplainResult` is a plain dataclass — alongside the human-readable `__str__` 
shown above, every field (`partition_pruning`, `bucket_pruning`, 
`file_skipping`, `split_count`, `splits_raw_convertible`, `level_histogram`, 
`splits`, ...) is addressable in Python for programmatic use.
+
 ## Rollback
 
 Paimon supports rolling back a table to a previous snapshot or tag. This is 
useful for undoing unwanted changes or
diff --git a/paimon-python/pypaimon/read/explain.py 
b/paimon-python/pypaimon/read/explain.py
new file mode 100644
index 0000000000..13db6d4157
--- /dev/null
+++ b/paimon-python/pypaimon/read/explain.py
@@ -0,0 +1,241 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import io
+from dataclasses import dataclass, field
+from typing import Any, Dict, List, Optional
+
+
+@dataclass
+class PruningStat:
+    """Before / after counters for one pruning stage.
+
+    ``before`` is the input size to the stage, ``after`` is the size that
+    survived. Either may be ``None`` when the stage did not run (for
+    example, ``bucket_pruning`` is ``None`` for tables that are not
+    HASH_FIXED with all bucket keys pinned).
+    """
+
+    before: Optional[int]
+    after: Optional[int]
+
+    @property
+    def pruned(self) -> Optional[int]:
+        if self.before is None or self.after is None:
+            return None
+        return self.before - self.after
+
+    def format(self) -> str:
+        if self.before is None and self.after is None:
+            return "n/a"
+        before = "?" if self.before is None else str(self.before)
+        after = "?" if self.after is None else str(self.after)
+        pruned = self.pruned
+        suffix = "" if pruned is None else "  (pruned {})".format(pruned)
+        return "{} -> {}{}".format(before, after, suffix)
+
+
+@dataclass
+class ExplainSplitInfo:
+    """Per-split detail surfaced when ``explain(verbose=True)`` is used."""
+
+    partition: Dict[str, Any]
+    bucket: int
+    file_count: int
+    row_count: int
+    merged_row_count: Optional[int]
+    file_size: int
+    raw_convertible: bool
+    has_deletion_vectors: bool
+    level_histogram: Dict[int, int]
+    deletion_file_count: int
+    file_paths: List[str]
+
+
+@dataclass
+class ExplainResult:
+    """Structured scan plan returned by ``ReadBuilder.explain()``.
+
+    The compact ``__str__`` shows enough signal to reason about cost: the
+    snapshot, the pushed-down predicate / projection / limit, the three
+    pruning before-after counters, split-level shape (raw-convertible
+    ratio, DV ratio, all-above-L0 ratio, files/split, size/split
+    distribution), and the file-level totals. ``verbose=True`` adds a
+    block listing each split.
+    """
+
+    # Identity
+    table_identifier: str
+    is_primary_key_table: bool
+    bucket_mode: str
+    deletion_vectors_enabled: bool
+    data_evolution_enabled: bool
+
+    # Snapshot
+    snapshot_id: Optional[int]
+    schema_id: Optional[int]
+
+    # Pushdown
+    predicate: Optional[str] = None
+    projection: Optional[List[str]] = None
+    limit: Optional[int] = None
+
+    # Pruning (None when not applicable)
+    partition_pruning: Optional[PruningStat] = None
+    bucket_pruning: Optional[PruningStat] = None
+    file_skipping: Optional[PruningStat] = None
+
+    # File-level aggregates over final splits
+    file_count: int = 0
+    total_file_size: int = 0
+    estimated_row_count: int = 0
+    estimated_merged_row_count: Optional[int] = None
+    deletion_file_count: int = 0
+    level_histogram: Dict[int, int] = field(default_factory=dict)
+
+    # Split-level aggregates (shown in compact mode too)
+    split_count: int = 0
+    splits_raw_convertible: int = 0
+    splits_with_deletion_vectors: int = 0
+    splits_all_above_l0: int = 0
+    files_per_split_min: int = 0
+    files_per_split_max: int = 0
+    files_per_split_avg: float = 0.0
+    split_size_min: int = 0
+    split_size_max: int = 0
+    split_size_avg: float = 0.0
+    split_size_p50: int = 0
+    split_size_p95: int = 0
+
+    # Verbose-only
+    splits: Optional[List[ExplainSplitInfo]] = None
+
+    def __str__(self) -> str:
+        return render_explain(self)
+
+
+# ---------------------------------------------------------------------------
+# Pretty-print helpers
+# ---------------------------------------------------------------------------
+
+def render_explain(result: ExplainResult) -> str:
+    out = io.StringIO()
+    out.write("== PyPaimon Scan Plan ==\n")
+
+    flags = []
+    flags.append("PK" if result.is_primary_key_table else "Append")
+    flags.append(result.bucket_mode)
+    if result.deletion_vectors_enabled:
+        flags.append("dv=on")
+    if result.data_evolution_enabled:
+        flags.append("data-evolution=on")
+    _line(out, "Table", "{} ({})".format(result.table_identifier, ", 
".join(flags)))
+
+    if result.snapshot_id is None:
+        _line(out, "Snapshot", "<none>  (table is empty or has no snapshot)")
+    else:
+        schema_part = "" if result.schema_id is None else "  (schema 
{})".format(result.schema_id)
+        _line(out, "Snapshot", "{}{}".format(result.snapshot_id, schema_part))
+
+    _line(out, "Predicate", result.predicate if result.predicate else "<none>")
+    _line(out, "Projection",
+          "[{}]".format(", ".join(result.projection)) if result.projection 
else "<all columns>")
+    _line(out, "Limit", str(result.limit) if result.limit is not None else 
"<none>")
+
+    out.write("\n")
+    _line(out, "Partition pruning",
+          result.partition_pruning.format() if result.partition_pruning else 
"n/a")
+    _line(out, "Bucket pruning",
+          result.bucket_pruning.format() if result.bucket_pruning else "n/a")
+    _line(out, "File skipping",
+          result.file_skipping.format() if result.file_skipping else "n/a")
+
+    out.write("\n")
+    _line(out, "Splits", str(result.split_count))
+    if result.split_count > 0:
+        out.write("  raw-convertible:  {} / {}\n".format(
+            result.splits_raw_convertible, result.split_count))
+        out.write("  with DV:          {} / {}\n".format(
+            result.splits_with_deletion_vectors, result.split_count))
+        out.write("  all-above-L0:     {} / {}\n".format(
+            result.splits_all_above_l0, result.split_count))
+        out.write("  files/split:      min={}  max={}  avg={:.2f}\n".format(
+            result.files_per_split_min,
+            result.files_per_split_max,
+            result.files_per_split_avg))
+        out.write("  size/split:       min={}  p50={}  p95={}  
max={}\n".format(
+            _format_size(result.split_size_min),
+            _format_size(result.split_size_p50),
+            _format_size(result.split_size_p95),
+            _format_size(result.split_size_max)))
+
+    out.write("\n")
+    _line(out, "Files", str(result.file_count))
+    _line(out, "Total size", _format_size(result.total_file_size))
+
+    rows = "{:,}".format(result.estimated_row_count)
+    if result.estimated_merged_row_count is not None:
+        rows += "   (merged: {:,})".format(result.estimated_merged_row_count)
+    _line(out, "Estimated rows", rows)
+
+    if result.level_histogram:
+        levels = sorted(result.level_histogram.items())
+        levels_str = "  ".join("L{}={}".format(lv, cnt) for lv, cnt in levels)
+        _line(out, "Level histogram", levels_str)
+    _line(out, "Deletion files", str(result.deletion_file_count))
+
+    if result.splits:
+        out.write("\nSplits[]\n")
+        for idx, split in enumerate(result.splits):
+            out.write("  [{}] partition={} bucket={} files={} size={} rows={} 
raw={} dv={}\n".format(
+                idx,
+                split.partition,
+                split.bucket,
+                split.file_count,
+                _format_size(split.file_size),
+                split.row_count,
+                split.raw_convertible,
+                split.has_deletion_vectors,
+            ))
+            if split.level_histogram:
+                levels = sorted(split.level_histogram.items())
+                out.write("      levels: {}\n".format(
+                    "  ".join("L{}={}".format(lv, cnt) for lv, cnt in levels)))
+            for path in split.file_paths:
+                out.write("      file: {}\n".format(path))
+
+    return out.getvalue().rstrip("\n")
+
+
+def _line(out: io.StringIO, label: str, value: str) -> None:
+    out.write("{:<19} {}\n".format(label + ":", value))
+
+
+_SIZE_UNITS = ("B", "KiB", "MiB", "GiB", "TiB", "PiB")
+
+
+def _format_size(num_bytes: int) -> str:
+    if num_bytes is None:
+        return "?"
+    size = float(num_bytes)
+    for unit in _SIZE_UNITS:
+        if size < 1024.0 or unit == _SIZE_UNITS[-1]:
+            if unit == "B":
+                return "{:d} {}".format(int(size), unit)
+            return "{:.1f} {}".format(size, unit)
+        size /= 1024.0
+    return "{:.1f} {}".format(size, _SIZE_UNITS[-1])
diff --git a/paimon-python/pypaimon/read/explain_render.py 
b/paimon-python/pypaimon/read/explain_render.py
new file mode 100644
index 0000000000..2e09da842a
--- /dev/null
+++ b/paimon-python/pypaimon/read/explain_render.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Any
+
+from pypaimon.common.predicate import Predicate
+
+
+_BINARY_OPS = {
+    'equal': '=',
+    'notEqual': '!=',
+    'lessThan': '<',
+    'lessOrEqual': '<=',
+    'greaterThan': '>',
+    'greaterOrEqual': '>=',
+}
+
+
+def render_predicate(predicate: Predicate) -> str:
+    """Render a :class:`Predicate` tree as a human-readable string.
+
+    The renderer relies only on the existing ``method`` / ``field`` /
+    ``literals`` shape and never mutates the predicate. Used by
+    ``ReadBuilder.explain()`` and intentionally lives outside
+    :mod:`pypaimon.common.predicate` so the predicate module stays
+    rendering-agnostic.
+    """
+    if predicate is None:
+        return ""
+    method = predicate.method
+    field = predicate.field
+    literals = predicate.literals
+
+    if method == 'and':
+        return _join_children(literals, 'AND')
+    if method == 'or':
+        return _join_children(literals, 'OR')
+    if method in _BINARY_OPS:
+        return "{} {} {}".format(field, _BINARY_OPS[method], 
_format_literal(literals[0]))
+    if method == 'in':
+        return "{} IN [{}]".format(field, ", ".join(_format_literal(v) for v 
in literals))
+    if method == 'notIn':
+        return "{} NOT IN [{}]".format(field, ", ".join(_format_literal(v) for 
v in literals))
+    if method == 'between':
+        return "{} BETWEEN {} AND {}".format(
+            field, _format_literal(literals[0]), _format_literal(literals[1]))
+    if method == 'notBetween':
+        return "{} NOT BETWEEN {} AND {}".format(
+            field, _format_literal(literals[0]), _format_literal(literals[1]))
+    if method == 'isNull':
+        return "{} IS NULL".format(field)
+    if method == 'isNotNull':
+        return "{} IS NOT NULL".format(field)
+    if method == 'startsWith':
+        return "{} STARTSWITH {}".format(field, _format_literal(literals[0]))
+    if method == 'endsWith':
+        return "{} ENDSWITH {}".format(field, _format_literal(literals[0]))
+    if method == 'contains':
+        return "{} CONTAINS {}".format(field, _format_literal(literals[0]))
+    if method == 'like':
+        return "{} LIKE {}".format(field, _format_literal(literals[0]))
+    return "{}({}{})".format(
+        method,
+        field if field is not None else "",
+        ", " + ", ".join(_format_literal(v) for v in (literals or [])) if 
literals else "",
+    )
+
+
+def _join_children(children, joiner: str) -> str:
+    parts = [render_predicate(c) for c in (children or [])]
+    parts = [p for p in parts if p]
+    if not parts:
+        return ""
+    if len(parts) == 1:
+        return parts[0]
+    return " {} ".format(joiner).join("({})".format(p) for p in parts)
+
+
+def _format_literal(value: Any) -> str:
+    if value is None:
+        return "NULL"
+    if isinstance(value, str):
+        return "'{}'".format(value.replace("'", "\\'"))
+    if isinstance(value, bytes):
+        return "b'{}'".format(value.hex())
+    return repr(value)
diff --git a/paimon-python/pypaimon/read/read_builder.py 
b/paimon-python/pypaimon/read/read_builder.py
index 13a951df0b..51233856f2 100644
--- a/paimon-python/pypaimon/read/read_builder.py
+++ b/paimon-python/pypaimon/read/read_builder.py
@@ -19,6 +19,10 @@ from typing import List, Optional
 
 from pypaimon.common.predicate import Predicate
 from pypaimon.common.predicate_builder import PredicateBuilder
+from pypaimon.read.explain import ExplainResult, ExplainSplitInfo, PruningStat
+from pypaimon.read.explain_render import render_predicate
+from pypaimon.read.scan_stats import ScanStats
+from pypaimon.read.split import Split
 from pypaimon.read.table_read import TableRead
 from pypaimon.read.table_scan import TableScan
 from pypaimon.schema.data_types import DataField
@@ -97,6 +101,41 @@ class ReadBuilder:
     def new_predicate_builder(self) -> PredicateBuilder:
         return PredicateBuilder(self.read_type())
 
+    # TODO: surface this through pypaimon's CLI (alongside cli_sql /
+    # cli_table) so users can run `pypaimon explain ...` against a table
+    # without writing any Python.
+    def explain(self, verbose: bool = False) -> ExplainResult:
+        """Produce a structured scan plan for this builder.
+
+        Runs one planning pass (manifest list + manifest reads, no data
+        files) and returns an :class:`ExplainResult` summarising the
+        target snapshot, the pushed-down predicate / projection / limit,
+        the partition / bucket / file-stats pruning funnel, and split-
+        level execution signals (raw-convertible ratio, deletion-vector
+        ratio, level histogram, files-per-split and split-size
+        distribution). With ``verbose=True``, every split is listed.
+
+        Cost: ``explain()`` reads manifest list + manifests but never
+        opens data files. To produce accurate before/after counters it
+        suppresses the manifest-reader's early bucket filter and forces
+        single-threaded manifest decoding, so it can be measurably
+        heavier than a regular ``new_scan().plan()`` on tables where the
+        early filter usually prunes aggressively (e.g. very wide
+        HASH_FIXED tables with a tight predicate).
+        """
+        scan = self.new_scan()
+        plan, stats = scan.scan_with_stats()
+        return _build_explain_result(
+            table=self.table,
+            scan=scan,
+            plan=plan,
+            stats=stats,
+            predicate=self._predicate,
+            projection=self._projection,
+            limit=self._limit,
+            verbose=verbose,
+        )
+
     def read_type(self) -> List[DataField]:
         table_fields = self.table.fields
 
@@ -158,3 +197,179 @@ class ReadBuilder:
             if ok:
                 paths.append(path)
         return paths
+
+
+def _build_explain_result(table, scan: TableScan, plan, stats: ScanStats,
+                          predicate, projection, limit, verbose: bool) -> 
ExplainResult:
+    """Translate one (Plan, ScanStats) pair into an ExplainResult."""
+    splits: List[Split] = plan.splits()
+
+    table_schema = table.table_schema
+    bucket_mode_str = _safe_bucket_mode(table)
+
+    partition_pruning = _partition_pruning(stats, scan)
+    bucket_pruning = _bucket_pruning(stats, scan)
+    file_skipping = _file_skipping(stats, scan)
+
+    files_per_split = [len(getattr(s, 'files', []) or []) for s in splits]
+    sizes = [int(getattr(s, 'file_size', 0) or 0) for s in splits]
+
+    rows_total = sum(int(getattr(s, 'row_count', 0) or 0) for s in splits)
+    merged_per_split = [s.merged_row_count() for s in splits]
+    if splits and all(v is not None for v in merged_per_split):
+        merged_total: Optional[int] = sum(merged_per_split)
+    else:
+        merged_total = None
+
+    file_count = sum(files_per_split)
+    total_size = sum(sizes)
+    level_hist: dict = {}
+    deletion_file_total = 0
+    splits_raw_convertible = 0
+    splits_with_dv = 0
+    splits_all_above_l0 = 0
+    split_infos: List[ExplainSplitInfo] = []
+
+    for split in splits:
+        files = getattr(split, 'files', []) or []
+        per_split_levels: dict = {}
+        for f in files:
+            lv = getattr(f, 'level', 0) or 0
+            level_hist[lv] = level_hist.get(lv, 0) + 1
+            per_split_levels[lv] = per_split_levels.get(lv, 0) + 1
+        dvs = getattr(split, 'data_deletion_files', None) or []
+        dv_count_here = sum(1 for d in dvs if d is not None)
+        deletion_file_total += dv_count_here
+        has_dv = dv_count_here > 0
+        raw = bool(getattr(split, 'raw_convertible', False))
+        if raw:
+            splits_raw_convertible += 1
+        if has_dv:
+            splits_with_dv += 1
+        if files and all((getattr(f, 'level', 0) or 0) > 0 for f in files):
+            splits_all_above_l0 += 1
+
+        if verbose:
+            split_infos.append(ExplainSplitInfo(
+                partition=_format_partition(split, table),
+                bucket=int(getattr(split, 'bucket', -1)),
+                file_count=len(files),
+                row_count=int(getattr(split, 'row_count', 0) or 0),
+                merged_row_count=split.merged_row_count(),
+                file_size=int(getattr(split, 'file_size', 0) or 0),
+                raw_convertible=raw,
+                has_deletion_vectors=has_dv,
+                level_histogram=per_split_levels,
+                deletion_file_count=dv_count_here,
+                file_paths=list(getattr(split, 'file_paths', []) or []),
+            ))
+
+    fps_min, fps_max, fps_avg = _min_max_avg(files_per_split)
+    sz_min, sz_max, sz_avg = _min_max_avg(sizes)
+    sz_p50 = _percentile(sizes, 50)
+    sz_p95 = _percentile(sizes, 95)
+
+    return ExplainResult(
+        table_identifier=str(table.identifier.get_full_name()),
+        is_primary_key_table=bool(table.is_primary_key_table),
+        bucket_mode=bucket_mode_str,
+        
deletion_vectors_enabled=bool(table.options.deletion_vectors_enabled()),
+        data_evolution_enabled=bool(table.options.data_evolution_enabled()),
+        snapshot_id=plan.snapshot_id,
+        schema_id=table_schema.id if plan.snapshot_id is not None else None,
+        predicate=render_predicate(predicate) if predicate is not None else 
None,
+        projection=list(projection) if projection else None,
+        limit=limit,
+        partition_pruning=partition_pruning,
+        bucket_pruning=bucket_pruning,
+        file_skipping=file_skipping,
+        file_count=file_count,
+        total_file_size=total_size,
+        estimated_row_count=rows_total,
+        estimated_merged_row_count=merged_total,
+        deletion_file_count=deletion_file_total,
+        level_histogram=level_hist,
+        split_count=len(splits),
+        splits_raw_convertible=splits_raw_convertible,
+        splits_with_deletion_vectors=splits_with_dv,
+        splits_all_above_l0=splits_all_above_l0,
+        files_per_split_min=fps_min,
+        files_per_split_max=fps_max,
+        files_per_split_avg=fps_avg,
+        split_size_min=sz_min,
+        split_size_max=sz_max,
+        split_size_avg=sz_avg,
+        split_size_p50=sz_p50,
+        split_size_p95=sz_p95,
+        splits=split_infos if verbose else None,
+    )
+
+
+def _partition_pruning(stats: ScanStats, scan: TableScan) -> 
Optional[PruningStat]:
+    if scan.predicate is None:
+        return None
+    table_partition_keys = scan.table.partition_keys or []
+    if not table_partition_keys:
+        return None
+    # ``entries_potential_total`` is the count from manifest-file metadata
+    # (manifest-level pruning has not been applied yet). The "after" side
+    # is everything that survived both manifest-stats and per-entry
+    # partition filters.
+    return PruningStat(
+        before=stats.entries_potential_total,
+        after=stats.entries_after_partition,
+    )
+
+
+def _bucket_pruning(stats: ScanStats, scan: TableScan) -> 
Optional[PruningStat]:
+    # Visible whenever the scan applies any bucket-level filtering — the
+    # HASH_FIXED predicate-driven selector OR the POSTPONE_BUCKET
+    # synthetic-bucket skip. Tables with neither (e.g. BUCKET_UNAWARE
+    # append) leave this counter as ``None``.
+    fs = scan.file_scanner
+    if fs._bucket_selector is None and not fs.only_read_real_buckets:
+        return None
+    return PruningStat(before=stats.entries_after_partition, 
after=stats.entries_after_bucket)
+
+
+def _file_skipping(stats: ScanStats, scan: TableScan) -> Optional[PruningStat]:
+    # Captures the funnel between bucket-stage survivors and the entries
+    # that actually feed the split generator. The drop here includes both
+    # predicate-driven file-stats pruning AND structural skips that fire
+    # in ``_filter_manifest_entry`` once a file is fully decoded (most
+    # notably the "do not read level-0 file" rule for DV-enabled PK
+    # tables, which is an LSM-shape decision rather than a predicate
+    # test).
+    if scan.predicate is None:
+        return None
+    return PruningStat(before=stats.entries_after_bucket, 
after=stats.entries_after_stats)
+
+
+def _safe_bucket_mode(table) -> str:
+    try:
+        return table.bucket_mode().name
+    except Exception:
+        return "UNKNOWN"
+
+
+def _format_partition(split, table) -> dict:
+    keys = list(table.partition_keys or [])
+    partition = getattr(split, 'partition', None)
+    if partition is None or not keys:
+        return {}
+    values = getattr(partition, 'values', None) or []
+    return {k: v for k, v in zip(keys, values)}
+
+
+def _min_max_avg(values):
+    if not values:
+        return 0, 0, 0.0
+    return min(values), max(values), sum(values) / float(len(values))
+
+
+def _percentile(values, pct: int) -> int:
+    if not values:
+        return 0
+    ordered = sorted(values)
+    idx = int(round((pct / 100.0) * (len(ordered) - 1)))
+    return int(ordered[idx])
diff --git a/paimon-python/pypaimon/read/scan_stats.py 
b/paimon-python/pypaimon/read/scan_stats.py
new file mode 100644
index 0000000000..6626b0c240
--- /dev/null
+++ b/paimon-python/pypaimon/read/scan_stats.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass, field
+from typing import Set, Tuple
+
+
+@dataclass
+class ScanStats:
+    """Counters accumulated by :class:`FileScanner` when it is asked to
+    track stats for ``ReadBuilder.explain()``.
+
+    The scanner mutates these counters in place; consumers should treat
+    instances as immutable once the scan returns. Default factory values
+    keep the dataclass usable both as a blank "no tracking" sentinel and
+    as a live accumulator.
+    """
+
+    manifest_files_total: int = 0
+    manifest_files_after_partition: int = 0
+
+    # ``entries_potential_total`` is the row count we would have processed if 
no
+    # filtering occurred — derived from manifest-file metadata before the
+    # manifest-level partition skip. ``entries_total`` is what actually reached
+    # ``_filter_manifest_entry``; the gap between the two captures
+    # manifest-level pruning.
+    entries_potential_total: int = 0
+    entries_total: int = 0
+    entries_after_partition: int = 0
+    entries_after_bucket: int = 0
+    entries_after_stats: int = 0
+
+    partition_keys_before: Set[Tuple] = field(default_factory=set)
+    partition_keys_after: Set[Tuple] = field(default_factory=set)
+
+    buckets_seen: Set[Tuple[Tuple, int]] = field(default_factory=set)
+    buckets_after_pruning: Set[Tuple[Tuple, int]] = field(default_factory=set)
diff --git a/paimon-python/pypaimon/read/scanner/file_scanner.py 
b/paimon-python/pypaimon/read/scanner/file_scanner.py
index 2878d41171..1d2831c194 100755
--- a/paimon-python/pypaimon/read/scanner/file_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/file_scanner.py
@@ -35,6 +35,7 @@ from pypaimon.read.plan import Plan
 from pypaimon.read.push_down_utils import (_get_all_fields,
                                            remove_row_id_filter,
                                            trim_and_transform_predicate)
+from pypaimon.read.scan_stats import ScanStats
 from pypaimon.read.scanner.append_table_split_generator import \
     AppendTableSplitGenerator
 from pypaimon.read.scanner.bucket_select_converter import \
@@ -210,6 +211,10 @@ class FileScanner:
         self._global_index_result = None
         self._scanned_snapshot = None
         self._scanned_snapshot_id = None
+        # Opt-in scan-plan tracking. Stays ``None`` for the read hot path;
+        # ``scan_with_stats()`` flips it on for a single explain pass and
+        # the filter callbacks below increment counters when present.
+        self.scan_stats: Optional[ScanStats] = None
 
         # Predicate-driven bucket pruning (HASH_FIXED only). Mirrors Java
         # BucketSelectConverter. Set on demand and reused across all
@@ -345,7 +350,20 @@ class FileScanner:
 
     def read_manifest_entries(self, manifest_files: List[ManifestFileMeta]) -> 
List[ManifestEntry]:
         max_workers = 
self.table.options.scan_manifest_parallelism(os.cpu_count() or 8)
+        if self.scan_stats is not None:
+            self.scan_stats.manifest_files_total += len(manifest_files)
+            # ``num_added_files + num_deleted_files`` is the entry count
+            # recorded in the manifest-file metadata; combined across
+            # all input manifest files this is the partition-prune-free
+            # baseline. The difference against ``entries_total`` reveals
+            # how much manifest-level pruning saved.
+            self.scan_stats.entries_potential_total += sum(
+                f.num_added_files + f.num_deleted_files for f in 
manifest_files)
         manifest_files = [entry for entry in manifest_files if 
self._filter_manifest_file(entry)]
+        if self.scan_stats is not None:
+            self.scan_stats.manifest_files_after_partition += 
len(manifest_files)
+            # Force single-threaded so we can mutate stats without locking.
+            max_workers = 1
         return self.manifest_file_manager.read_entries_parallel(
             manifest_files,
             self._filter_manifest_entry,
@@ -364,6 +382,11 @@ class FileScanner:
         still happens later in ``_filter_manifest_entry`` once the entry
         is fully decoded.
         """
+        # explain() needs accurate before/after pruning counters; suppress
+        # the early bucket filter so every entry reaches
+        # ``_filter_manifest_entry`` where each rejection stage is counted.
+        if self.scan_stats is not None:
+            return None
         only_real = self.only_read_real_buckets
         selector = self._bucket_selector
         if not only_real and selector is None:
@@ -402,6 +425,19 @@ class FileScanner:
         self._global_index_result = result
         return self
 
+    def scan_with_stats(self) -> Tuple[Plan, ScanStats]:
+        """Run one scan pass while recording :class:`ScanStats` counters.
+
+        Side-effects: forces single-thread manifest reads and disables the
+        early bucket filter so every entry reaches
+        ``_filter_manifest_entry`` exactly once. The scanner is one-shot
+        in this mode — call ``scan()`` on a fresh instance afterwards if
+        you need the regular hot path.
+        """
+        self.scan_stats = ScanStats()
+        plan = self.scan()
+        return plan, self.scan_stats
+
     def _apply_push_down_limit(self, splits: List[DataSplit]) -> 
List[DataSplit]:
         """Mirror Java ``DataTableBatchScan.applyPushDownLimit``: sum the
         DV-aware ``merged_row_count`` (== Java ``Split.mergedRowCount()``)
@@ -498,9 +534,30 @@ class FileScanner:
         )
 
     def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
-        # Redundant safety net: the early filter in the manifest reader
-        # already enforces these, but guard here too so this method is
-        # self-contained if called outside read_entries_parallel.
+        stats = self.scan_stats
+        if stats is not None:
+            stats.entries_total += 1
+            partition_key = tuple(entry.partition.values)
+            stats.partition_keys_before.add(partition_key)
+            stats.buckets_seen.add((partition_key, entry.bucket))
+        # Stage 1: partition predicate. The early manifest-reader filter
+        # only sees ``(bucket, total_buckets)`` and never enforces
+        # partition predicates, so this check is the sole partition gate
+        # at the entry level — not a "redundant safety net".
+        if self.partition_key_predicate and not 
self.partition_key_predicate.test(entry.partition):
+            return False
+        if stats is not None:
+            stats.entries_after_partition += 1
+            stats.partition_keys_after.add(partition_key)
+        # Stage 2: bucket rejection. Two reasons land here:
+        #   * ``only_read_real_buckets`` drops the synthetic
+        #     POSTPONE_BUCKET bucket id (also enforced by the early
+        #     filter when present; kept here so the method is correct
+        #     standalone).
+        #   * ``_bucket_selector`` is the HASH_FIXED predicate-driven
+        #     selector built by ``_init_bucket_selector``.
+        # Both are accounted for under ``entries_after_bucket`` so the
+        # explain funnel reports bucket-level pruning end-to-end.
         if self.only_read_real_buckets and entry.bucket < 0:
             return False
         if (self._bucket_selector is not None
@@ -508,8 +565,9 @@ class FileScanner:
                 and not self._bucket_selector(
                     entry.partition, entry.bucket, entry.total_buckets)):
             return False
-        if self.partition_key_predicate and not 
self.partition_key_predicate.test(entry.partition):
-            return False
+        if stats is not None:
+            stats.entries_after_bucket += 1
+            stats.buckets_after_pruning.add((partition_key, entry.bucket))
         # Get SimpleStatsEvolution for this schema
         evolution = 
self.simple_stats_evolutions.get_or_create(entry.file.schema_id)
 
@@ -540,14 +598,18 @@ class FileScanner:
                     entry.file.row_count
                 ):
                     return False
+            if stats is not None:
+                stats.entries_after_stats += 1
             return True
         else:
-            if not self.predicate:
-                return True
-            if self.predicate_for_stats is None:
+            if not self.predicate or self.predicate_for_stats is None:
+                if stats is not None:
+                    stats.entries_after_stats += 1
                 return True
             # Data evolution: file stats may be from another schema, skip 
stats filter and filter in reader.
             if self.data_evolution:
+                if stats is not None:
+                    stats.entries_after_stats += 1
                 return True
             if entry.file.value_stats_cols is None and entry.file.write_cols 
is not None:
                 stats_fields = entry.file.write_cols
@@ -558,10 +620,13 @@ class FileScanner:
                 entry.file.row_count,
                 stats_fields
             )
-            return self.predicate_for_stats.test_by_simple_stats(
+            kept = self.predicate_for_stats.test_by_simple_stats(
                 evolved_stats,
                 entry.file.row_count
             )
+            if kept and stats is not None:
+                stats.entries_after_stats += 1
+            return kept
 
     def _scan_dv_index(self, snapshot, buckets: Set[tuple]) -> Dict[tuple, 
Dict[str, DeletionFile]]:
         """
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index d4ac6cfe16..bc610134e0 100755
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -15,12 +15,13 @@
 # specific language governing permissions and limitations
 # under the License.
 
-from typing import Optional
+from typing import Optional, Tuple
 
 from pypaimon.common.options.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
 
 from pypaimon.read.plan import Plan
+from pypaimon.read.scan_stats import ScanStats
 from pypaimon.read.scanner.file_scanner import FileScanner
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
 
@@ -44,6 +45,14 @@ class TableScan:
     def plan(self) -> Plan:
         return self.file_scanner.scan()
 
+    def scan_with_stats(self) -> Tuple[Plan, ScanStats]:
+        """Run :meth:`plan` while recording manifest / pruning counters.
+
+        Only used by :meth:`ReadBuilder.explain`; the regular read path
+        keeps going through :meth:`plan`.
+        """
+        return self.file_scanner.scan_with_stats()
+
     def _create_file_scanner(self) -> FileScanner:
         options = self.table.options.options
         snapshot_manager = self.table.snapshot_manager()
diff --git a/paimon-python/pypaimon/tests/read_builder_explain_test.py 
b/paimon-python/pypaimon/tests/read_builder_explain_test.py
new file mode 100644
index 0000000000..82b0f0387f
--- /dev/null
+++ b/paimon-python/pypaimon/tests/read_builder_explain_test.py
@@ -0,0 +1,270 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Tests for ``ReadBuilder.explain``: pruning funnel counters,
+split-level execution signals, and pretty-print smoke."""
+
+import os
+import shutil
+import tempfile
+import unittest
+from typing import Any, Dict, List
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.common.predicate import Predicate
+from pypaimon.read.explain import ExplainResult
+from pypaimon.read.explain_render import render_predicate
+
+
+def _write(table, rows: List[Dict], pa_schema: pa.Schema) -> None:
+    wb = table.new_batch_write_builder()
+    w = wb.new_write()
+    c = wb.new_commit()
+    try:
+        w.write_arrow(pa.Table.from_pylist(rows, schema=pa_schema))
+        c.commit(w.prepare_commit())
+    finally:
+        w.close()
+        c.close()
+
+
+class ReadBuilderExplainTest(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', False)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    # ---- helpers --------------------------------------------------------
+
+    def _append_table(self, name: str) -> Any:
+        pa_schema = pa.schema([
+            pa.field('id', pa.int64(), nullable=False),
+            ('val', pa.int64()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            options={'bucket': '-1', 'file.format': 'parquet'},
+        )
+        full = 'default.{}'.format(name)
+        self.catalog.create_table(full, schema, False)
+        return self.catalog.get_table(full), pa_schema
+
+    def _pk_partitioned_bucketed_table(self, name: str, num_buckets: int = 4) 
-> Any:
+        pa_schema = pa.schema([
+            pa.field('dt', pa.string(), nullable=False),
+            pa.field('id', pa.int64(), nullable=False),
+            ('val', pa.int64()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            partition_keys=['dt'],
+            primary_keys=['dt', 'id'],
+            options={
+                'bucket': str(num_buckets),
+                'bucket-key': 'id',
+                'file.format': 'parquet',
+            },
+        )
+        full = 'default.{}'.format(name)
+        self.catalog.create_table(full, schema, False)
+        return self.catalog.get_table(full), pa_schema
+
+    def _pk_dv_table(self, name: str, num_buckets: int = 2) -> Any:
+        pa_schema = pa.schema([
+            pa.field('id', pa.int64(), nullable=False),
+            ('val', pa.int64()),
+        ])
+        schema = Schema.from_pyarrow_schema(
+            pa_schema,
+            primary_keys=['id'],
+            options={
+                'bucket': str(num_buckets),
+                'file.format': 'parquet',
+                'deletion-vectors.enabled': 'true',
+            },
+        )
+        full = 'default.{}'.format(name)
+        self.catalog.create_table(full, schema, False)
+        return self.catalog.get_table(full), pa_schema
+
+    # ---- 1. append-only baseline ---------------------------------------
+
+    def test_explain_append_only_no_predicate(self):
+        table, pa_schema = self._append_table('explain_basic')
+        _write(table, [{'id': i, 'val': i * 2} for i in range(20)], pa_schema)
+        _write(table, [{'id': i, 'val': i * 3} for i in range(20, 40)], 
pa_schema)
+
+        rb = table.new_read_builder()
+        result = rb.explain()
+
+        plan_splits = rb.new_scan().plan().splits()
+        self.assertEqual(result.split_count, len(plan_splits))
+        self.assertGreater(result.file_count, 0)
+        self.assertGreater(result.total_file_size, 0)
+        self.assertIsNone(result.partition_pruning)
+        self.assertIsNone(result.bucket_pruning)
+        self.assertIsNone(result.file_skipping)
+        self.assertIsNone(result.predicate)
+
+    # ---- 2. PK + partition + bucket pruning ----------------------------
+
+    def test_explain_pk_table_with_partition_and_bucket_predicate(self):
+        table, pa_schema = self._pk_partitioned_bucketed_table(
+            'explain_pk_pruning', num_buckets=4)
+        for day in ['2026-05-01', '2026-05-02', '2026-05-03', '2026-05-04']:
+            _write(
+                table,
+                [{'dt': day, 'id': i, 'val': i + 1} for i in range(20)],
+                pa_schema,
+            )
+
+        rb = table.new_read_builder()
+        pb = rb.new_predicate_builder()
+        pred = pb.and_predicates([
+            pb.equal('dt', '2026-05-01'),
+            pb.equal('id', 7),
+        ])
+        rb = rb.with_filter(pred)
+        result = rb.explain()
+
+        self.assertIsNotNone(result.partition_pruning)
+        self.assertIsNotNone(result.bucket_pruning)
+        self.assertIsNotNone(result.file_skipping)
+        self.assertGreater(
+            result.partition_pruning.before, result.partition_pruning.after,
+            "partition predicate must drop at least one entry")
+        self.assertGreater(
+            result.bucket_pruning.before, result.bucket_pruning.after,
+            "HASH_FIXED bucket pruning must drop at least one entry")
+
+        # Cross-check against the actual scan
+        plan_splits = rb.new_scan().plan().splits()
+        self.assertEqual(result.split_count, len(plan_splits))
+
+    # ---- 3. predicate rendering ----------------------------------------
+
+    def test_render_predicate_shapes(self):
+        eq = Predicate(method='equal', index=0, field='dt', 
literals=['2026-05-01'])
+        self.assertEqual(render_predicate(eq), "dt = '2026-05-01'")
+
+        in_p = Predicate(method='in', index=1, field='id', literals=[1, 2, 3])
+        self.assertEqual(render_predicate(in_p), "id IN [1, 2, 3]")
+
+        between = Predicate(method='between', index=1, field='id', 
literals=[5, 10])
+        self.assertEqual(render_predicate(between), "id BETWEEN 5 AND 10")
+
+        is_null = Predicate(method='isNull', index=0, field='val', 
literals=None)
+        self.assertEqual(render_predicate(is_null), "val IS NULL")
+
+        and_p = Predicate(method='and', index=None, field=None, literals=[eq, 
in_p])
+        self.assertEqual(
+            render_predicate(and_p),
+            "(dt = '2026-05-01') AND (id IN [1, 2, 3])")
+
+        or_p = Predicate(method='or', index=None, field=None, 
literals=[between, is_null])
+        self.assertEqual(
+            render_predicate(or_p),
+            "(id BETWEEN 5 AND 10) OR (val IS NULL)")
+
+    # ---- 4. verbose split detail ---------------------------------------
+
+    def test_explain_verbose_lists_per_split_detail(self):
+        table, pa_schema = self._append_table('explain_verbose')
+        _write(table, [{'id': i, 'val': i} for i in range(50)], pa_schema)
+        _write(table, [{'id': i, 'val': i} for i in range(50, 100)], pa_schema)
+
+        rb = table.new_read_builder()
+        result = rb.explain(verbose=True)
+        self.assertIsNotNone(result.splits)
+        self.assertEqual(len(result.splits), result.split_count)
+
+        plan_splits = rb.new_scan().plan().splits()
+        for explained, actual in zip(result.splits, plan_splits):
+            self.assertEqual(explained.bucket, actual.bucket)
+            self.assertEqual(explained.file_count, len(actual.files))
+            self.assertEqual(set(explained.file_paths), set(actual.file_paths))
+
+    # ---- 5. empty snapshot path ----------------------------------------
+
+    def test_explain_empty_snapshot(self):
+        table, _ = self._append_table('explain_empty')
+
+        rb = table.new_read_builder()
+        result = rb.explain()
+        self.assertIsNone(result.snapshot_id)
+        self.assertEqual(result.split_count, 0)
+        self.assertEqual(result.file_count, 0)
+        self.assertIn("<none>", str(result))
+
+    # ---- 6. split-level metrics: DV vs append-only ---------------------
+
+    def test_explain_split_level_metrics(self):
+        # Append-only: every split is raw-convertible, no deletion vectors.
+        table, pa_schema = self._append_table('explain_split_signals_ap')
+        _write(table, [{'id': i, 'val': i} for i in range(30)], pa_schema)
+        _write(table, [{'id': i, 'val': i} for i in range(30, 60)], pa_schema)
+        result = table.new_read_builder().explain()
+        self.assertGreater(result.split_count, 0)
+        self.assertEqual(result.splits_with_deletion_vectors, 0)
+        self.assertEqual(result.splits_raw_convertible, result.split_count)
+        # All written files are at L0, and append-only doesn't filter them.
+        self.assertIn(0, result.level_histogram)
+        self.assertEqual(result.splits_all_above_l0, 0)
+
+        # DV-enabled PK table: pypaimon writes alone don't trigger compaction,
+        # so every file stays at L0. ``_filter_manifest_entry`` then drops
+        # them all, leaving an empty plan. The vacuous "all above L0"
+        # invariant (0 / 0) still has to hold.
+        dv_table, dv_pa = self._pk_dv_table('explain_split_signals_dv', 
num_buckets=2)
+        _write(dv_table, [{'id': i, 'val': i} for i in range(20)], dv_pa)
+        _write(dv_table, [{'id': i, 'val': i * 10} for i in range(10)], dv_pa)
+        dv_result = dv_table.new_read_builder().explain()
+        self.assertEqual(dv_result.split_count, 0)
+        self.assertEqual(dv_result.splits_all_above_l0, 0)
+
+    # ---- 7. pretty-print smoke -----------------------------------------
+
+    def test_pretty_print_smoke(self):
+        table, pa_schema = self._append_table('explain_print_smoke')
+        _write(table, [{'id': i, 'val': i} for i in range(40)], pa_schema)
+
+        result = table.new_read_builder().explain()
+        self.assertIsInstance(result, ExplainResult)
+        printed = str(result)
+        for anchor in (
+            "Snapshot:",
+            "Splits:",
+            "raw-convertible:",
+            "with DV:",
+            "size/split:",
+            "Files:",
+            "Total size:",
+        ):
+            self.assertIn(anchor, printed, "missing anchor: " + anchor)
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to