This is an automated email from the ASF dual-hosted git repository.

betodealmeida pushed a commit to branch sl-cache
in repository https://gitbox.apache.org/repos/asf/superset.git

commit 209b44522db9ca3daa6cba538a665271368c40b8
Author: Beto Dealmeida <[email protected]>
AuthorDate: Tue May 12 17:17:09 2026 -0400

    Leverage additive metrics
---
 .../src/superset_core/semantic_layers/types.py     |  21 ++
 superset/semantic_layers/cache.py                  | 182 ++++++++++--
 .../semantic_layers/cache_integration_test.py      | 104 +++++++
 tests/unit_tests/semantic_layers/cache_test.py     | 317 +++++++++++++++++++--
 4 files changed, 572 insertions(+), 52 deletions(-)

diff --git a/superset-core/src/superset_core/semantic_layers/types.py 
b/superset-core/src/superset_core/semantic_layers/types.py
index 1239c1303be..3bfa9e8c315 100644
--- a/superset-core/src/superset_core/semantic_layers/types.py
+++ b/superset-core/src/superset_core/semantic_layers/types.py
@@ -92,6 +92,26 @@ class Dimension:
     grain: Grain | None = None
 
 
+class AggregationType(str, enum.Enum):
+    """
+    Aggregation function applied by a metric.
+
+    Additivity (across an arbitrary set of grouping dimensions):
+    * ``SUM``, ``COUNT``: fully additive — sub-group sums roll up via ``sum``.
+    * ``MIN``, ``MAX``: roll up via ``min`` / ``max`` of sub-group values.
+    * ``AVG``, ``COUNT_DISTINCT``, ``OTHER``: not safely roll-uppable from
+      sub-aggregates without auxiliary data.
+    """
+
+    SUM = "SUM"
+    COUNT = "COUNT"
+    MIN = "MIN"
+    MAX = "MAX"
+    AVG = "AVG"
+    COUNT_DISTINCT = "COUNT_DISTINCT"
+    OTHER = "OTHER"
+
+
 @dataclass(frozen=True)
 class Metric:
     id: str
@@ -100,6 +120,7 @@ class Metric:
 
     definition: str
     description: str | None = None
+    aggregation: AggregationType | None = None
 
 
 @dataclass(frozen=True)
diff --git a/superset/semantic_layers/cache.py 
b/superset/semantic_layers/cache.py
index b69a8803cbf..ef5cdc68eb3 100644
--- a/superset/semantic_layers/cache.py
+++ b/superset/semantic_layers/cache.py
@@ -50,10 +50,12 @@ import pyarrow as pa
 from flask import current_app
 from superset_core.semantic_layers.types import (
     AdhocExpression,
+    AggregationType,
     Dimension,
     Filter,
     Metric,
     Operator,
+    OrderDirection,
     OrderTuple,
     PredicateType,
     SemanticQuery,
@@ -64,6 +66,7 @@ from superset_core.semantic_layers.types import (
 from superset.extensions import cache_manager
 from superset.utils import json
 from superset.utils.hashing import hash_from_str
+from superset.utils.pandas_postprocessing.aggregate import aggregate
 
 logger = logging.getLogger(__name__)
 
@@ -71,6 +74,14 @@ INDEX_KEY_PREFIX = "sv:idx:"
 VALUE_KEY_PREFIX = "sv:val:"
 MAX_ENTRIES_PER_SHAPE = 32
 
+_AGGREGATION_TO_PANDAS: dict[AggregationType, str] = {
+    AggregationType.SUM: "sum",
+    AggregationType.COUNT: "sum",
+    AggregationType.MIN: "min",
+    AggregationType.MAX: "max",
+}
+ADDITIVE_AGGREGATIONS = frozenset(_AGGREGATION_TO_PANDAS)
+
 
 @dataclass(frozen=True)
 class ViewMeta:
@@ -84,6 +95,7 @@ class ViewMeta:
 @dataclass(frozen=True)
 class CachedEntry:
     filters: frozenset[Filter]
+    dimension_keys: frozenset[str]
     limit: int | None
     offset: int
     order_key: str
@@ -113,14 +125,16 @@ def try_serve_from_cache(
         served: SemanticResult | None = None
         for entry in entries:
             if served is None:
-                ok, leftovers = can_satisfy(entry, query)
+                ok, leftovers, projection_needed = can_satisfy(entry, query)
                 if ok:
                     payload = cache.get(entry.value_key)
                     if payload is None:
                         # value evicted but index entry survived; drop it
                         continue
                     pruned.append(entry)
-                    served = _apply_post_processing(payload, query, leftovers)
+                    served = _apply_post_processing(
+                        payload, query, leftovers, projection_needed
+                    )
                     continue
             # keep entry; verify its value is still alive
             if cache.get(entry.value_key) is not None:
@@ -150,6 +164,7 @@ def store_result(
         entries: list[CachedEntry] = list(cache.get(idx_key) or [])
         entry = CachedEntry(
             filters=frozenset(query.filters or set()),
+            dimension_keys=frozenset(_dimension_key(d) for d in 
query.dimensions),
             limit=query.limit,
             offset=query.offset or 0,
             order_key=_order_key(query.order),
@@ -173,10 +188,10 @@ def store_result(
 
 
 def shape_key(view_meta: ViewMeta, query: SemanticQuery) -> str:
-    shape = {
-        "m": sorted(m.id for m in query.metrics),
-        "d": sorted(_dimension_key(d) for d in query.dimensions),
-    }
+    # The shape key buckets entries by metric set only; dimensions live on each
+    # ``CachedEntry`` so we can find broader (dimension-superset) entries via 
the
+    # projection path.
+    shape = {"m": sorted(m.id for m in query.metrics)}
     digest = hash_from_str(json.dumps(shape, sort_keys=True))[:16]
     return 
f"{INDEX_KEY_PREFIX}{view_meta.uuid}:{view_meta.changed_on_iso}:{digest}"
 
@@ -270,26 +285,42 @@ def _timeout(view_meta: ViewMeta) -> int | None:
 def can_satisfy(  # noqa: C901
     entry: CachedEntry,
     query: SemanticQuery,
-) -> tuple[bool, set[Filter]]:
-    """Return ``(reusable, leftover_filters_to_apply)`` for ``entry`` vs 
``query``."""
+) -> tuple[bool, set[Filter], bool]:
+    """
+    Return ``(reusable, leftover_filters, projection_needed)`` for ``entry`` vs
+    ``query``. ``projection_needed`` is True when the cached entry has a strict
+    superset of the new dimensions and a pandas rollup is required.
+    """
+    new_dim_keys = frozenset(_dimension_key(d) for d in query.dimensions)
+    cached_dim_keys = entry.dimension_keys
+
+    if cached_dim_keys == new_dim_keys:
+        projection_needed = False
+    elif cached_dim_keys > new_dim_keys:
+        projection_needed = True
+        if not _projection_allowed(entry, query, new_dim_keys, 
cached_dim_keys):
+            return False, set(), False
+    else:
+        return False, set(), False
+
     new_filters = frozenset(query.filters or set())
 
     c_adhoc, c_having, c_where = _split(entry.filters)
     n_adhoc, n_having, n_where = _split(new_filters)
 
     if c_adhoc != n_adhoc:
-        return False, set()
+        return False, set(), False
     if c_having != n_having:
-        return False, set()
+        return False, set(), False
 
     c_by_col = _group_by_column(c_where)
     n_by_col = _group_by_column(n_where)
 
-    for col_id, c_list in c_by_col.items():
-        n_list = n_by_col.get(col_id, [])
+    for c_list in c_by_col.values():
         for c in c_list:
+            n_list = n_by_col.get(_filter_col_id(c), [])
             if not any(_implies(n, c) for n in n_list):
-                return False, set()
+                return False, set(), False
 
     leftovers: set[Filter] = set()
     for col_id, n_list in n_by_col.items():
@@ -297,27 +328,72 @@ def can_satisfy(  # noqa: C901
         for n in n_list:
             if not any(_implies(c, n) for c in c_list):
                 if n.column is None or n.operator == Operator.ADHOC:
-                    return False, set()
+                    return False, set(), False
                 leftovers.add(n)
 
-    projection_ids = _projection_ids(query)
+    # Leftover filters are applied to the cached DataFrame BEFORE the optional
+    # rollup, so their columns must be present in the cached projection.
+    allowed_ids = _cached_column_ids(entry, query)
     for leftover in leftovers:
-        if leftover.column is None or leftover.column.id not in projection_ids:
-            return False, set()
+        if leftover.column is None or leftover.column.id not in allowed_ids:
+            return False, set(), False
 
     if entry.offset != 0 or (query.offset or 0) != 0:
-        return False, set()
+        return False, set(), False
+
+    if projection_needed:
+        # Re-aggregation will re-order by ``query.order`` after rollup, so the
+        # cached order is irrelevant. We do require the new order (if any) to
+        # reference only surviving columns; otherwise sort would fail 
post-rollup.
+        if not _order_uses_only(query.order, _projection_ids(query)):
+            return False, set(), False
+    else:
+        if entry.limit is not None:
+            if query.limit is None or query.limit > entry.limit:
+                return False, set(), False
+            if entry.order_key != _order_key(query.order):
+                return False, set(), False
 
+        if entry.group_limit_key != _group_limit_key(query.group_limit):
+            return False, set(), False
+
+    return True, leftovers, projection_needed
+
+
+def _projection_allowed(
+    entry: CachedEntry,
+    query: SemanticQuery,
+    new_dim_keys: frozenset[str],
+    cached_dim_keys: frozenset[str],
+) -> bool:
+    """
+    Gates for the projection path (above and beyond filter containment).
+    """
+    if any(m.aggregation not in ADDITIVE_AGGREGATIONS for m in query.metrics):
+        return False
+    # Cached truncation makes the rollup unsafe (we're missing rows).
     if entry.limit is not None:
-        if query.limit is None or query.limit > entry.limit:
-            return False, set()
-        if entry.order_key != _order_key(query.order):
-            return False, set()
+        return False
+    if entry.group_limit_key:
+        return False
+    # Cached HAVING dropped sub-aggregate rows; the rolled-up totals would be
+    # off. Conservative: skip the projection path when cached has any HAVING.
+    if any(f.type == PredicateType.HAVING for f in entry.filters):
+        return False
+    return True
+
 
-    if entry.group_limit_key != _group_limit_key(query.group_limit):
-        return False, set()
+def _filter_col_id(f: Filter) -> str | None:
+    return f.column.id if f.column is not None else None
 
-    return True, leftovers
+
+def _order_uses_only(
+    order: list[OrderTuple] | None,
+    allowed_ids: set[str],
+) -> bool:
+    if not order:
+        return True
+    return all(_orderable_id(element) in allowed_ids for element, _ in order)
 
 
 def _split(
@@ -348,6 +424,12 @@ def _projection_ids(query: SemanticQuery) -> set[str]:
     return {d.id for d in query.dimensions} | {m.id for m in query.metrics}
 
 
+def _cached_column_ids(entry: CachedEntry, query: SemanticQuery) -> set[str]:
+    """Column ids available in the cached DataFrame (cached dims + shared 
metrics)."""
+    cached_dim_ids = {key.rsplit("@", 1)[0] for key in entry.dimension_keys}
+    return cached_dim_ids | {m.id for m in query.metrics}
+
+
 # ---------------------------------------------------------------------------
 # Pairwise implication
 # ---------------------------------------------------------------------------
@@ -522,9 +604,10 @@ def _apply_post_processing(
     cached: SemanticResult,
     query: SemanticQuery,
     leftovers: set[Filter],
+    projection_needed: bool,
 ) -> SemanticResult:
-    """Apply leftover filters and the new limit to a cached result."""
-    if not leftovers and query.limit is None:
+    """Apply leftover filters, projection (re-aggregation), order, and 
limit."""
+    if not leftovers and not projection_needed and query.limit is None:
         return cached
 
     df = cached.results.to_pandas()
@@ -533,17 +616,54 @@ def _apply_post_processing(
         for f in leftovers:
             mask &= _mask_for(df, f)
         df = df[mask]
+
+    note_def = "Served from semantic view smart cache (post-processed locally)"
+    if projection_needed:
+        groupby = [d.name for d in query.dimensions]
+        aggregates = {
+            m.name: {
+                "column": m.name,
+                "operator": _AGGREGATION_TO_PANDAS[
+                    # Guarded by ``_projection_allowed`` — non-None and 
additive.
+                    m.aggregation  # type: ignore[index]
+                ],
+            }
+            for m in query.metrics
+        }
+        df = aggregate(df, groupby=groupby, aggregates=aggregates)
+        df = _apply_order(df, query.order)
+        note_def = "Served from semantic view smart cache (re-aggregated 
locally)"
+
     if query.limit is not None:
         df = df.head(query.limit)
 
     table = pa.Table.from_pandas(df, preserve_index=False)
-    note = SemanticRequest(
-        type="cache",
-        definition="Served from semantic view smart cache (post-processed 
locally)",
-    )
+    note = SemanticRequest(type="cache", definition=note_def)
     return SemanticResult(requests=list(cached.requests) + [note], 
results=table)
 
 
+def _apply_order(
+    df: pd.DataFrame,
+    order: list[OrderTuple] | None,
+) -> pd.DataFrame:
+    if not order:
+        return df
+    available: list[tuple[str, bool]] = []
+    for element, direction in order:
+        col = _orderable_id_name(element)
+        if col in df.columns:
+            available.append((col, direction == OrderDirection.ASC))
+    if not available:
+        return df
+    cols = [col for col, _ in available]
+    asc = [a for _, a in available]
+    return df.sort_values(by=cols, ascending=asc).reset_index(drop=True)
+
+
+def _orderable_id_name(element: Metric | Dimension | AdhocExpression) -> str:
+    return getattr(element, "name", element.id)
+
+
 def _mask_for(df: pd.DataFrame, f: Filter) -> pd.Series:  # noqa: C901
     if f.column is None:
         return pd.Series(True, index=df.index)
diff --git a/tests/unit_tests/semantic_layers/cache_integration_test.py 
b/tests/unit_tests/semantic_layers/cache_integration_test.py
index 87b5e659042..c028c74f84e 100644
--- a/tests/unit_tests/semantic_layers/cache_integration_test.py
+++ b/tests/unit_tests/semantic_layers/cache_integration_test.py
@@ -28,6 +28,7 @@ import pyarrow as pa
 import pytest
 from pytest_mock import MockerFixture
 from superset_core.semantic_layers.types import (
+    AggregationType,
     Dimension,
     Metric,
     SemanticRequest,
@@ -189,3 +190,106 @@ def test_changed_on_invalidates_cache(
     datasource.changed_on = datetime(2026, 2, 1, 0, 0, 0)
     get_results(_qo(datasource, ">", 1))
     assert view_implementation.get_table.call_count == 2
+
+
+# ---------------------------------------------------------------------------
+# Projection (v2) — dropping a dimension and re-aggregating
+# ---------------------------------------------------------------------------
+
+
+def _make_view(metric_aggregation: AggregationType | None) -> tuple[Any, 
MagicMock]:
+    dim_b = Dimension(id="t.b", name="b", type=pa.utf8())
+    dim_c = Dimension(id="t.c", name="c", type=pa.utf8())
+    metric_x = Metric(
+        id="t.x",
+        name="x",
+        type=pa.float64(),
+        definition="sum(x)",
+        aggregation=metric_aggregation,
+    )
+    impl = MagicMock()
+    impl.metrics = {metric_x}
+    impl.dimensions = {dim_b, dim_c}
+    impl.features = frozenset()
+    impl.get_metrics = MagicMock(return_value={metric_x})
+    impl.get_dimensions = MagicMock(return_value={dim_b, dim_c})
+
+    ds = MagicMock()
+    ds.implementation = impl
+    ds.uuid = "proj-view"
+    ds.changed_on = datetime(2026, 3, 1, 0, 0, 0)
+    ds.cache_timeout = 60
+    ds.fetch_values_predicate = None
+    return impl, ds
+
+
+def _qo_dims(ds: MagicMock, columns: list[str]) -> ValidatedQueryObject:
+    return ValidatedQueryObject(
+        datasource=ds,
+        metrics=["x"],
+        columns=columns,  # type: ignore[arg-type]
+        filters=[],
+    )
+
+
+def _result_bc(rows: list[tuple[str, str, float]]) -> SemanticResult:
+    df = pd.DataFrame(rows, columns=["b", "c", "x"])
+    return SemanticResult(
+        requests=[SemanticRequest(type="SQL", definition="select b,c,sum(x)")],
+        results=pa.Table.from_pandas(df, preserve_index=False),
+    )
+
+
+def test_projection_reuses_cached_for_dropped_dim(
+    fake_cache: _InMemoryCache,
+) -> None:
+    impl, ds = _make_view(AggregationType.SUM)
+    impl.get_table = MagicMock(
+        return_value=_result_bc(
+            [("b1", "c1", 5.0), ("b1", "c2", 3.0), ("b2", "c1", 4.0)]
+        )
+    )
+
+    first = get_results(_qo_dims(ds, ["b", "c"]))
+    assert impl.get_table.call_count == 1
+    assert len(first.df) == 3
+
+    second = get_results(_qo_dims(ds, ["b"]))
+    assert impl.get_table.call_count == 1  # served via projection
+    df = second.df.sort_values("b").reset_index(drop=True)
+    assert df["b"].tolist() == ["b1", "b2"]
+    assert df["x"].tolist() == [8.0, 4.0]
+
+
+def test_projection_skipped_when_aggregation_unknown(
+    fake_cache: _InMemoryCache,
+) -> None:
+    impl, ds = _make_view(None)  # metric has no aggregation declared
+    impl.get_table = MagicMock(
+        side_effect=[
+            _result_bc([("b1", "c1", 5.0), ("b1", "c2", 3.0)]),
+            _result_bc([("b1", "c1", 5.0)]),  # what the SV would compute for 
[b]
+        ]
+    )
+
+    get_results(_qo_dims(ds, ["b", "c"]))
+    assert impl.get_table.call_count == 1
+
+    get_results(_qo_dims(ds, ["b"]))
+    assert impl.get_table.call_count == 2  # cannot project, re-executed
+
+
+def test_projection_skipped_for_avg(
+    fake_cache: _InMemoryCache,
+) -> None:
+    impl, ds = _make_view(AggregationType.AVG)
+    impl.get_table = MagicMock(
+        side_effect=[
+            _result_bc([("b1", "c1", 5.0), ("b1", "c2", 3.0)]),
+            _result_bc([("b1", "c1", 4.0)]),
+        ]
+    )
+
+    get_results(_qo_dims(ds, ["b", "c"]))
+    get_results(_qo_dims(ds, ["b"]))
+    assert impl.get_table.call_count == 2
diff --git a/tests/unit_tests/semantic_layers/cache_test.py 
b/tests/unit_tests/semantic_layers/cache_test.py
index bfe5cf948cc..c3634511cd5 100644
--- a/tests/unit_tests/semantic_layers/cache_test.py
+++ b/tests/unit_tests/semantic_layers/cache_test.py
@@ -24,6 +24,7 @@ import pandas as pd
 import pyarrow as pa
 import pytest
 from superset_core.semantic_layers.types import (
+    AggregationType,
     Dimension,
     Filter,
     Metric,
@@ -54,8 +55,18 @@ def dim(id_: str, name: str | None = None) -> Dimension:
     return Dimension(id=id_, name=name or id_, type=pa.utf8())
 
 
-def met(id_: str, name: str | None = None) -> Metric:
-    return Metric(id=id_, name=name or id_, type=pa.float64(), definition="x")
+def met(
+    id_: str,
+    name: str | None = None,
+    aggregation: AggregationType | None = None,
+) -> Metric:
+    return Metric(
+        id=id_,
+        name=name or id_,
+        type=pa.float64(),
+        definition="x",
+        aggregation=aggregation,
+    )
 
 
 COL_A = dim("col.a", "a")
@@ -95,10 +106,15 @@ def query(
 
 
 def entry_from(q: SemanticQuery, value_key_: str = "vk") -> CachedEntry:
-    from superset.semantic_layers.cache import _group_limit_key, _order_key
+    from superset.semantic_layers.cache import (
+        _dimension_key,
+        _group_limit_key,
+        _order_key,
+    )
 
     return CachedEntry(
         filters=frozenset(q.filters or set()),
+        dimension_keys=frozenset(_dimension_key(d) for d in q.dimensions),
         limit=q.limit,
         offset=q.offset or 0,
         order_key=_order_key(q.order),
@@ -193,15 +209,16 @@ def test_implies_like_exact_match_only() -> None:
 def test_can_satisfy_empty_cached_returns_all_as_leftovers() -> None:
     cached_q = query(filters=None)
     new_q = query(filters={where(COL_A, Operator.GREATER_THAN, 5)})
-    ok, leftovers = can_satisfy(entry_from(cached_q), new_q)
+    ok, leftovers, projection = can_satisfy(entry_from(cached_q), new_q)
     assert ok is True
+    assert projection is False
     assert leftovers == {where(COL_A, Operator.GREATER_THAN, 5)}
 
 
 def test_can_satisfy_narrower_filter() -> None:
     cached_q = query(filters={where(COL_A, Operator.GREATER_THAN, 1)})
     new_q = query(filters={where(COL_A, Operator.GREATER_THAN, 2)})
-    ok, leftovers = can_satisfy(entry_from(cached_q), new_q)
+    ok, leftovers, _ = can_satisfy(entry_from(cached_q), new_q)
     assert ok is True
     assert leftovers == {where(COL_A, Operator.GREATER_THAN, 2)}
 
@@ -209,7 +226,7 @@ def test_can_satisfy_narrower_filter() -> None:
 def test_can_satisfy_broader_filter_fails() -> None:
     cached_q = query(filters={where(COL_A, Operator.GREATER_THAN, 2)})
     new_q = query(filters={where(COL_A, Operator.GREATER_THAN, 1)})
-    ok, leftovers = can_satisfy(entry_from(cached_q), new_q)
+    ok, leftovers, _ = can_satisfy(entry_from(cached_q), new_q)
     assert ok is False
     assert leftovers == set()
 
@@ -217,7 +234,7 @@ def test_can_satisfy_broader_filter_fails() -> None:
 def test_can_satisfy_missing_constraint_fails() -> None:
     cached_q = query(filters={where(COL_A, Operator.GREATER_THAN, 1)})
     new_q = query(filters=None)
-    ok, _ = can_satisfy(entry_from(cached_q), new_q)
+    ok, _, _ = can_satisfy(entry_from(cached_q), new_q)
     assert ok is False
 
 
@@ -229,7 +246,7 @@ def test_can_satisfy_new_filter_on_extra_column() -> None:
             where(COL_B, Operator.EQUALS, "x"),
         }
     )
-    ok, leftovers = can_satisfy(entry_from(cached_q), new_q)
+    ok, leftovers, _ = can_satisfy(entry_from(cached_q), new_q)
     assert ok is True
     assert leftovers == {
         where(COL_A, Operator.GREATER_THAN, 2),
@@ -244,7 +261,7 @@ def 
test_can_satisfy_leftover_on_non_projected_column_fails() -> None:
         filters={where(other, Operator.EQUALS, "x")},
         dimensions=[COL_A, COL_B],
     )
-    ok, _ = can_satisfy(entry_from(cached_q), new_q)
+    ok, _, _ = can_satisfy(entry_from(cached_q), new_q)
     assert ok is False
 
 
@@ -252,8 +269,8 @@ def test_can_satisfy_having_requires_exact_set() -> None:
     cached_q = query(filters={having(M_X, Operator.GREATER_THAN, 100)})
     same = query(filters={having(M_X, Operator.GREATER_THAN, 100)})
     tighter = query(filters={having(M_X, Operator.GREATER_THAN, 200)})
-    ok_same, _ = can_satisfy(entry_from(cached_q), same)
-    ok_tight, _ = can_satisfy(entry_from(cached_q), tighter)
+    ok_same, _, _ = can_satisfy(entry_from(cached_q), same)
+    ok_tight, _, _ = can_satisfy(entry_from(cached_q), tighter)
     assert ok_same is True
     assert ok_tight is False
 
@@ -262,8 +279,8 @@ def test_can_satisfy_adhoc_requires_exact_set() -> None:
     cached_q = query(filters={adhoc("col_a > 1")})
     same = query(filters={adhoc("col_a > 1")})
     different = query(filters={adhoc("col_a > 2")})
-    ok_same, _ = can_satisfy(entry_from(cached_q), same)
-    ok_diff, _ = can_satisfy(entry_from(cached_q), different)
+    ok_same, _, _ = can_satisfy(entry_from(cached_q), same)
+    ok_diff, _, _ = can_satisfy(entry_from(cached_q), different)
     assert ok_same is True
     assert ok_diff is False
 
@@ -276,7 +293,7 @@ def test_can_satisfy_adhoc_requires_exact_set() -> None:
 def test_can_satisfy_unlimited_cached_satisfies_any_limit() -> None:
     cached_q = query(filters=None, limit=None)
     new_q = query(filters=None, limit=10)
-    ok, leftovers = can_satisfy(entry_from(cached_q), new_q)
+    ok, leftovers, _ = can_satisfy(entry_from(cached_q), new_q)
     assert ok is True
     assert leftovers == set()
 
@@ -285,35 +302,35 @@ def test_can_satisfy_smaller_limit_with_matching_order() 
-> None:
     order = [(M_X, OrderDirection.DESC)]
     cached_q = query(filters=None, limit=100, order=order)
     new_q = query(filters=None, limit=10, order=order)
-    ok, _ = can_satisfy(entry_from(cached_q), new_q)
+    ok, _, _ = can_satisfy(entry_from(cached_q), new_q)
     assert ok is True
 
 
 def test_can_satisfy_smaller_limit_different_order_fails() -> None:
     cached_q = query(filters=None, limit=100, order=[(M_X, 
OrderDirection.DESC)])
     new_q = query(filters=None, limit=10, order=[(M_X, OrderDirection.ASC)])
-    ok, _ = can_satisfy(entry_from(cached_q), new_q)
+    ok, _, _ = can_satisfy(entry_from(cached_q), new_q)
     assert ok is False
 
 
 def test_can_satisfy_larger_limit_fails() -> None:
     cached_q = query(filters=None, limit=10)
     new_q = query(filters=None, limit=100)
-    ok, _ = can_satisfy(entry_from(cached_q), new_q)
+    ok, _, _ = can_satisfy(entry_from(cached_q), new_q)
     assert ok is False
 
 
 def test_can_satisfy_no_new_limit_when_cached_has_one_fails() -> None:
     cached_q = query(filters=None, limit=100)
     new_q = query(filters=None, limit=None)
-    ok, _ = can_satisfy(entry_from(cached_q), new_q)
+    ok, _, _ = can_satisfy(entry_from(cached_q), new_q)
     assert ok is False
 
 
 def test_can_satisfy_offset_never_reused() -> None:
     cached_q = SemanticQuery(metrics=[M_X], dimensions=[COL_A], offset=5)
     new_q = SemanticQuery(metrics=[M_X], dimensions=[COL_A], offset=5)
-    ok, _ = can_satisfy(entry_from(cached_q), new_q)
+    ok, _, _ = can_satisfy(entry_from(cached_q), new_q)
     assert ok is False
 
 
@@ -333,7 +350,7 @@ def test_apply_post_processing_filters_and_limits() -> None:
         limit=2,
     )
     result = _apply_post_processing(
-        cached, new_q, {where(COL_A, Operator.GREATER_THAN, 2)}
+        cached, new_q, {where(COL_A, Operator.GREATER_THAN, 2)}, False
     )
     result_df = result.results.to_pandas()
     assert list(result_df["a"]) == [3, 5]
@@ -347,7 +364,7 @@ def 
test_apply_post_processing_no_leftovers_no_limit_returns_original() -> None:
         requests=[], results=pa.Table.from_pandas(df, preserve_index=False)
     )
     new_q = query(filters=None, limit=None)
-    out = _apply_post_processing(cached, new_q, set())
+    out = _apply_post_processing(cached, new_q, set(), False)
     # same object reference is OK; we explicitly return the input
     assert out is cached
 
@@ -394,3 +411,261 @@ def test_value_key_with_datetime_filter() -> None:
     q = SemanticQuery(metrics=[M_X], dimensions=[COL_A], filters={f})
     # should not raise
     assert value_key(VIEW, q).startswith("sv:val:")
+
+
+def test_shape_key_independent_of_dimensions() -> None:
+    # The v2 shape key buckets entries by metric set only; different dimension
+    # sets share the same shape so the projection path can find broader 
entries.
+    q1 = SemanticQuery(metrics=[M_X], dimensions=[COL_A, COL_B])
+    q2 = SemanticQuery(metrics=[M_X], dimensions=[COL_A])
+    assert shape_key(VIEW, q1) == shape_key(VIEW, q2)
+    # Value keys still differ.
+    assert value_key(VIEW, q1) != value_key(VIEW, q2)
+
+
+# ---------------------------------------------------------------------------
+# Projection (v2)
+# ---------------------------------------------------------------------------
+
+
+M_SUM = met("met.sum", "sum_x", aggregation=AggregationType.SUM)
+M_COUNT = met("met.count", "count_x", aggregation=AggregationType.COUNT)
+M_MIN = met("met.min", "min_x", aggregation=AggregationType.MIN)
+M_MAX = met("met.max", "max_x", aggregation=AggregationType.MAX)
+M_AVG = met("met.avg", "avg_x", aggregation=AggregationType.AVG)
+M_UNKNOWN = met("met.unknown", "unknown_x", aggregation=None)
+
+
+def _projection_query(
+    metrics: list[Metric],
+    new_dimensions: list[Dimension],
+    cached_dimensions: list[Dimension],
+    cached_filters: set[Filter] | None = None,
+    cached_limit: int | None = None,
+    new_filters: set[Filter] | None = None,
+    new_limit: int | None = None,
+    new_order: Any = None,
+) -> tuple[CachedEntry, SemanticQuery]:
+    cached_q = SemanticQuery(
+        metrics=metrics,
+        dimensions=cached_dimensions,
+        filters=cached_filters,
+        limit=cached_limit,
+    )
+    new_q = SemanticQuery(
+        metrics=metrics,
+        dimensions=new_dimensions,
+        filters=new_filters,
+        limit=new_limit,
+        order=new_order,
+    )
+    return entry_from(cached_q), new_q
+
+
[email protected](
+    "metric,operator",
+    [
+        (M_SUM, "sum"),
+        (M_COUNT, "sum"),
+        (M_MIN, "min"),
+        (M_MAX, "max"),
+    ],
+)
+def test_can_satisfy_projection_each_additive_op(metric: Metric, operator: 
str) -> None:
+    entry, new_q = _projection_query(
+        metrics=[metric],
+        new_dimensions=[COL_A],
+        cached_dimensions=[COL_A, COL_B],
+    )
+    ok, leftovers, projection = can_satisfy(entry, new_q)
+    assert ok is True
+    assert projection is True
+    assert leftovers == set()
+
+
+def test_projection_rolls_up_sum() -> None:
+    entry, new_q = _projection_query(
+        metrics=[M_SUM],
+        new_dimensions=[COL_A],
+        cached_dimensions=[COL_A, COL_B],
+    )
+    cached_df = pd.DataFrame(
+        {"a": ["x", "x", "y", "y"], "b": [1, 2, 1, 2], "sum_x": [10, 20, 30, 
40]}
+    )
+    cached = SemanticResult(
+        requests=[SemanticRequest(type="SQL", definition="select ...")],
+        results=pa.Table.from_pandas(cached_df, preserve_index=False),
+    )
+    out = _apply_post_processing(cached, new_q, set(), True)
+    out_df = out.results.to_pandas().sort_values("a").reset_index(drop=True)
+    assert list(out_df["a"]) == ["x", "y"]
+    assert list(out_df["sum_x"]) == [30, 70]
+
+
+def test_projection_rolls_up_min_max_count() -> None:
+    entry, new_q = _projection_query(
+        metrics=[M_MIN, M_MAX, M_COUNT],
+        new_dimensions=[COL_A],
+        cached_dimensions=[COL_A, COL_B],
+    )
+    cached_df = pd.DataFrame(
+        {
+            "a": ["x", "x", "y", "y"],
+            "b": [1, 2, 1, 2],
+            "min_x": [5, 2, 9, 8],
+            "max_x": [50, 60, 70, 80],
+            "count_x": [1, 1, 2, 3],
+        }
+    )
+    cached = SemanticResult(
+        requests=[],
+        results=pa.Table.from_pandas(cached_df, preserve_index=False),
+    )
+    out = _apply_post_processing(cached, new_q, set(), True)
+    df = out.results.to_pandas().sort_values("a").reset_index(drop=True)
+    assert list(df["min_x"]) == [2, 8]
+    assert list(df["max_x"]) == [60, 80]
+    assert list(df["count_x"]) == [2, 5]
+
+
+def test_projection_drops_multiple_dims() -> None:
+    col_c = dim("col.c", "c")
+    entry, new_q = _projection_query(
+        metrics=[M_SUM],
+        new_dimensions=[COL_A],
+        cached_dimensions=[COL_A, COL_B, col_c],
+    )
+    cached_df = pd.DataFrame(
+        {
+            "a": ["x", "x", "x", "y"],
+            "b": [1, 1, 2, 1],
+            "c": [10, 20, 10, 10],
+            "sum_x": [1, 2, 3, 4],
+        }
+    )
+    cached = SemanticResult(
+        requests=[], results=pa.Table.from_pandas(cached_df, 
preserve_index=False)
+    )
+    out = _apply_post_processing(cached, new_q, set(), True)
+    df = out.results.to_pandas().sort_values("a").reset_index(drop=True)
+    assert list(df["sum_x"]) == [6, 4]
+
+
+def test_projection_with_leftover_filter_then_rollup() -> None:
+    entry, new_q = _projection_query(
+        metrics=[M_SUM],
+        new_dimensions=[COL_A],
+        cached_dimensions=[COL_A, COL_B],
+        new_filters={where(COL_B, Operator.GREATER_THAN, 1)},
+    )
+    cached_df = pd.DataFrame(
+        {"a": ["x", "x", "y"], "b": [1, 2, 2], "sum_x": [10, 20, 30]}
+    )
+    cached = SemanticResult(
+        requests=[], results=pa.Table.from_pandas(cached_df, 
preserve_index=False)
+    )
+    ok, leftovers, projection = can_satisfy(entry, new_q)
+    assert ok is True
+    assert projection is True
+    out = _apply_post_processing(cached, new_q, leftovers, projection)
+    df = out.results.to_pandas().sort_values("a").reset_index(drop=True)
+    # b > 1 removes the (x,1) row; x sums to 20, y to 30
+    assert list(df["sum_x"]) == [20, 30]
+
+
+def test_projection_with_order_and_limit() -> None:
+    entry, new_q = _projection_query(
+        metrics=[M_SUM],
+        new_dimensions=[COL_A],
+        cached_dimensions=[COL_A, COL_B],
+        new_order=[(M_SUM, OrderDirection.DESC)],
+        new_limit=1,
+    )
+    cached_df = pd.DataFrame(
+        {"a": ["x", "x", "y"], "b": [1, 2, 1], "sum_x": [1, 2, 100]}
+    )
+    cached = SemanticResult(
+        requests=[], results=pa.Table.from_pandas(cached_df, 
preserve_index=False)
+    )
+    out = _apply_post_processing(cached, new_q, set(), True)
+    df = out.results.to_pandas()
+    assert len(df) == 1
+    assert df["a"].tolist() == ["y"]
+    assert df["sum_x"].tolist() == [100]
+
+
+def test_projection_rejected_when_metric_aggregation_unknown() -> None:
+    entry, new_q = _projection_query(
+        metrics=[M_UNKNOWN],
+        new_dimensions=[COL_A],
+        cached_dimensions=[COL_A, COL_B],
+    )
+    ok, _, _ = can_satisfy(entry, new_q)
+    assert ok is False
+
+
+def test_projection_rejected_for_avg() -> None:
+    entry, new_q = _projection_query(
+        metrics=[M_AVG],
+        new_dimensions=[COL_A],
+        cached_dimensions=[COL_A, COL_B],
+    )
+    ok, _, _ = can_satisfy(entry, new_q)
+    assert ok is False
+
+
+def test_projection_rejected_when_cached_has_limit() -> None:
+    entry, new_q = _projection_query(
+        metrics=[M_SUM],
+        new_dimensions=[COL_A],
+        cached_dimensions=[COL_A, COL_B],
+        cached_limit=10,
+    )
+    ok, _, _ = can_satisfy(entry, new_q)
+    assert ok is False
+
+
+def test_projection_rejected_when_cached_has_having() -> None:
+    entry, new_q = _projection_query(
+        metrics=[M_SUM],
+        new_dimensions=[COL_A],
+        cached_dimensions=[COL_A, COL_B],
+        cached_filters={having(M_SUM, Operator.GREATER_THAN, 10)},
+        new_filters={having(M_SUM, Operator.GREATER_THAN, 10)},
+    )
+    ok, _, _ = can_satisfy(entry, new_q)
+    assert ok is False
+
+
+def test_projection_rejected_when_order_references_dropped_dim() -> None:
+    entry, new_q = _projection_query(
+        metrics=[M_SUM],
+        new_dimensions=[COL_A],
+        cached_dimensions=[COL_A, COL_B],
+        new_order=[(COL_B, OrderDirection.ASC)],
+    )
+    ok, _, _ = can_satisfy(entry, new_q)
+    assert ok is False
+
+
+def test_projection_rejected_when_cached_has_filter_on_dropped_dim() -> None:
+    # cached restricts c; rolling up to [a] would miss rows we'd need
+    entry, new_q = _projection_query(
+        metrics=[M_SUM],
+        new_dimensions=[COL_A],
+        cached_dimensions=[COL_A, COL_B],
+        cached_filters={where(COL_B, Operator.GREATER_THAN, 5)},
+    )
+    ok, _, _ = can_satisfy(entry, new_q)
+    assert ok is False
+
+
+def test_projection_rejected_when_cached_dims_subset_not_superset() -> None:
+    # cached has just [a]; new wants [a, b] — finer-grained data unavailable
+    entry, new_q = _projection_query(
+        metrics=[M_SUM],
+        new_dimensions=[COL_A, COL_B],
+        cached_dimensions=[COL_A],
+    )
+    ok, _, _ = can_satisfy(entry, new_q)
+    assert ok is False


Reply via email to