This is an automated email from the ASF dual-hosted git repository.

betodealmeida pushed a commit to branch sl-cache
in repository https://gitbox.apache.org/repos/asf/superset.git

commit 520401e23d628862a18f0652b991319acda3f5af
Author: Beto Dealmeida <[email protected]>
AuthorDate: Thu May 14 14:42:48 2026 -0400

    Fixes
---
 superset/charts/schemas.py                         |  5 ++
 superset/common/query_context_processor.py         |  1 +
 superset/common/utils/query_cache_manager.py       |  7 +++
 superset/models/helpers.py                         |  2 +
 superset/semantic_layers/cache.py                  | 27 ++++++++---
 superset/semantic_layers/mapper.py                 |  3 ++
 .../semantic_layers/cache_integration_test.py      | 44 ++++++++++++++++++
 tests/unit_tests/semantic_layers/cache_test.py     | 54 ++++++++++++++++++++--
 tests/unit_tests/semantic_layers/mapper_test.py    | 35 ++++++++++++++
 9 files changed, 169 insertions(+), 9 deletions(-)

diff --git a/superset/charts/schemas.py b/superset/charts/schemas.py
index e0cee7758c4..711bc382991 100644
--- a/superset/charts/schemas.py
+++ b/superset/charts/schemas.py
@@ -1497,6 +1497,11 @@ class ChartDataResponseResult(Schema):
         required=True,
         allow_none=None,
     )
+    semantic_cache_hit = fields.Boolean(
+        metadata={"description": "Whether the semantic layer smart cache was 
used"},
+        required=False,
+        allow_none=True,
+    )
     query = fields.String(
         metadata={
             "description": "The executed query statement. May be absent when "
diff --git a/superset/common/query_context_processor.py 
b/superset/common/query_context_processor.py
index 52fc6d24f28..bad78532051 100644
--- a/superset/common/query_context_processor.py
+++ b/superset/common/query_context_processor.py
@@ -202,6 +202,7 @@ class QueryContextProcessor:
             "annotation_data": cache.annotation_data,
             "error": cache.error_message,
             "is_cached": cache.is_cached,
+            "semantic_cache_hit": cache.semantic_cache_hit,
             "query": cache.query,
             "status": cache.status,
             "stacktrace": cache.stacktrace,
diff --git a/superset/common/utils/query_cache_manager.py 
b/superset/common/utils/query_cache_manager.py
index da2d668e8c9..bd6a4e1f14e 100644
--- a/superset/common/utils/query_cache_manager.py
+++ b/superset/common/utils/query_cache_manager.py
@@ -69,6 +69,7 @@ class QueryCacheManager:
         cache_value: dict[str, Any] | None = None,
         sql_rowcount: int | None = None,
         queried_dttm: str | None = None,
+        semantic_cache_hit: bool | None = None,
     ) -> None:
         self.df = df
         self.query = query
@@ -86,6 +87,7 @@ class QueryCacheManager:
         self.cache_value = cache_value
         self.sql_rowcount = sql_rowcount
         self.queried_dttm = queried_dttm
+        self.semantic_cache_hit = semantic_cache_hit
 
     # pylint: disable=too-many-arguments
     def set_query_result(
@@ -110,6 +112,7 @@ class QueryCacheManager:
             self.error_message = query_result.error_message
             self.df = query_result.df
             self.sql_rowcount = query_result.sql_rowcount
+            self.semantic_cache_hit = query_result.semantic_cache_hit
             self.annotation_data = {} if annotation_data is None else 
annotation_data
             self.queried_dttm = (
                 
datetime.now(tz=timezone.utc).replace(microsecond=0).isoformat()
@@ -131,6 +134,7 @@ class QueryCacheManager:
                 "rejected_filter_columns": self.rejected_filter_columns,
                 "annotation_data": self.annotation_data,
                 "sql_rowcount": self.sql_rowcount,
+                "semantic_cache_hit": self.semantic_cache_hit,
                 "queried_dttm": self.queried_dttm,
                 "dttm": self.queried_dttm,  # Backwards compatibility
             }
@@ -186,6 +190,9 @@ class QueryCacheManager:
                 query_cache.is_loaded = True
                 query_cache.is_cached = cache_value is not None
                 query_cache.sql_rowcount = cache_value.get("sql_rowcount", 
None)
+                query_cache.semantic_cache_hit = cache_value.get(
+                    "semantic_cache_hit", None
+                )
                 query_cache.cache_dttm = (
                     cache_value["dttm"] if cache_value is not None else None
                 )
diff --git a/superset/models/helpers.py b/superset/models/helpers.py
index 6634d25910e..dfbf58c50c8 100644
--- a/superset/models/helpers.py
+++ b/superset/models/helpers.py
@@ -673,6 +673,7 @@ class QueryResult:  # pylint: disable=too-few-public-methods
         errors: Optional[list[dict[str, Any]]] = None,
         from_dttm: Optional[datetime] = None,
         to_dttm: Optional[datetime] = None,
+        semantic_cache_hit: Optional[bool] = None,
     ) -> None:
         self.df = df
         self.query = query
@@ -685,6 +686,7 @@ class QueryResult:  # pylint: disable=too-few-public-methods
         self.errors = errors or []
         self.from_dttm = from_dttm
         self.to_dttm = to_dttm
+        self.semantic_cache_hit = semantic_cache_hit
         self.sql_rowcount = len(self.df.index) if not self.df.empty else 0
 
 
diff --git a/superset/semantic_layers/cache.py 
b/superset/semantic_layers/cache.py
index a57db368c3f..e45c4717482 100644
--- a/superset/semantic_layers/cache.py
+++ b/superset/semantic_layers/cache.py
@@ -131,6 +131,13 @@ def try_serve_from_cache(
                     if payload is None:
                         # value evicted but index entry survived; drop it
                         continue
+                    if projection_needed and not _projection_input_complete(
+                        entry, payload
+                    ):
+                        # Cached result may be truncated (top-N). Keep the 
index
+                        # entry alive but skip reuse for projection.
+                        pruned.append(entry)
+                        continue
                     pruned.append(entry)
                     served = _apply_post_processing(
                         payload, query, leftovers, projection_needed
@@ -298,7 +305,7 @@ def can_satisfy(  # noqa: C901
         projection_needed = False
     elif cached_dim_keys > new_dim_keys:
         projection_needed = True
-        if not _projection_allowed(entry, query, new_dim_keys, 
cached_dim_keys):
+        if not _projection_allowed(entry, query):
             return False, set(), False
     else:
         return False, set(), False
@@ -363,17 +370,12 @@ def can_satisfy(  # noqa: C901
 def _projection_allowed(
     entry: CachedEntry,
     query: SemanticQuery,
-    new_dim_keys: frozenset[str],
-    cached_dim_keys: frozenset[str],
 ) -> bool:
     """
     Gates for the projection path (above and beyond filter containment).
     """
     if any(m.aggregation not in ADDITIVE_AGGREGATIONS for m in query.metrics):
         return False
-    # Cached truncation makes the rollup unsafe (we're missing rows).
-    if entry.limit is not None:
-        return False
     if entry.group_limit_key:
         return False
     if query.group_limit is not None:
@@ -385,6 +387,19 @@ def _projection_allowed(
     return True
 
 
+def _projection_input_complete(entry: CachedEntry, payload: SemanticResult) -> 
bool:
+    """
+    True when a projection source is guaranteed not to be limit-truncated.
+
+    If a cached query had ``limit=N`` and returned exactly ``N`` rows, there 
might
+    be additional source rows that were cut off. We only reuse it for 
projection
+    when the payload row count is strictly less than ``N``.
+    """
+    if entry.limit is None:
+        return True
+    return payload.results.num_rows < entry.limit
+
+
 def _filter_col_id(f: Filter) -> str | None:
     return f.column.id if f.column is not None else None
 
diff --git a/superset/semantic_layers/mapper.py 
b/superset/semantic_layers/mapper.py
index 515a2d38fc9..a4afff69d51 100644
--- a/superset/semantic_layers/mapper.py
+++ b/superset/semantic_layers/mapper.py
@@ -264,6 +264,8 @@ def map_semantic_result_to_query_result(
             f"-- {req.type}\n{req.definition}" for req in 
semantic_result.requests
         )
 
+    semantic_cache_hit = any(req.type == "cache" for req in 
semantic_result.requests)
+
     return QueryResult(
         # Core data
         df=semantic_result.results.to_pandas(),
@@ -284,6 +286,7 @@ def map_semantic_result_to_query_result(
         # Time range - pass through from original query_object
         from_dttm=query_object.from_dttm,
         to_dttm=query_object.to_dttm,
+        semantic_cache_hit=semantic_cache_hit,
     )
 
 
diff --git a/tests/unit_tests/semantic_layers/cache_integration_test.py 
b/tests/unit_tests/semantic_layers/cache_integration_test.py
index c028c74f84e..432bbdfba6c 100644
--- a/tests/unit_tests/semantic_layers/cache_integration_test.py
+++ b/tests/unit_tests/semantic_layers/cache_integration_test.py
@@ -293,3 +293,47 @@ def test_projection_skipped_for_avg(
     get_results(_qo_dims(ds, ["b", "c"]))
     get_results(_qo_dims(ds, ["b"]))
     assert impl.get_table.call_count == 2
+
+
+def test_projection_reuses_when_cached_limit_not_reached(
+    fake_cache: _InMemoryCache,
+) -> None:
+    impl, ds = _make_view(AggregationType.SUM)
+    impl.get_table = MagicMock(
+        return_value=_result_bc(
+            [("b1", "c1", 5.0), ("b1", "c2", 3.0), ("b2", "c1", 4.0)]
+        )
+    )
+
+    first = get_results(_qo_dims(ds, ["b", "c"]))
+    assert impl.get_table.call_count == 1
+    assert len(first.df) == 3
+
+    second = get_results(_qo_dims(ds, ["b"]))
+    assert impl.get_table.call_count == 1  # served via projection
+    df = second.df.sort_values("b").reset_index(drop=True)
+    assert df["b"].tolist() == ["b1", "b2"]
+    assert df["x"].tolist() == [8.0, 4.0]
+
+
+def test_projection_skips_when_cached_limit_reached(
+    fake_cache: _InMemoryCache,
+) -> None:
+    impl, ds = _make_view(AggregationType.SUM)
+
+    first_q = _qo_dims(ds, ["b", "c"])
+    first_q.row_limit = 3
+    second_q = _qo_dims(ds, ["b"])
+
+    impl.get_table = MagicMock(
+        side_effect=[
+            _result_bc([("b1", "c1", 5.0), ("b1", "c2", 3.0), ("b2", "c1", 
4.0)]),
+            _result_bc([("b1", "c1", 8.0), ("b2", "c1", 4.0)]),
+        ]
+    )
+
+    get_results(first_q)
+    assert impl.get_table.call_count == 1
+
+    get_results(second_q)
+    assert impl.get_table.call_count == 2  # projection skipped; re-executed
diff --git a/tests/unit_tests/semantic_layers/cache_test.py 
b/tests/unit_tests/semantic_layers/cache_test.py
index 6a43168d8d6..985649f111f 100644
--- a/tests/unit_tests/semantic_layers/cache_test.py
+++ b/tests/unit_tests/semantic_layers/cache_test.py
@@ -40,6 +40,7 @@ from superset_core.semantic_layers.types import (
 from superset.semantic_layers.cache import (
     _apply_post_processing,
     _implies,
+    _projection_input_complete,
     CachedEntry,
     can_satisfy,
     shape_key,
@@ -635,15 +636,62 @@ def test_projection_rejected_for_avg() -> None:
     assert ok is False
 
 
-def test_projection_rejected_when_cached_has_limit() -> None:
+def test_projection_with_cached_limit_defers_to_runtime_rowcount_check() -> 
None:
     entry, new_q = _projection_query(
         metrics=[M_SUM],
         new_dimensions=[COL_A],
         cached_dimensions=[COL_A, COL_B],
         cached_limit=10,
     )
-    ok, _, _ = can_satisfy(entry, new_q)
-    assert ok is False
+    ok, leftovers, projection = can_satisfy(entry, new_q)
+    assert ok is True
+    assert leftovers == set()
+    assert projection is True
+
+
+def test_projection_input_complete_unlimited_cached() -> None:
+    entry = entry_from(
+        SemanticQuery(metrics=[M_SUM], dimensions=[COL_A, COL_B], limit=None)
+    )
+    payload = SemanticResult(
+        requests=[],
+        results=pa.Table.from_pydict({"a": ["x"], "b": [1], "sum_x": [1.0]}),
+    )
+    assert _projection_input_complete(entry, payload) is True
+
+
+def test_projection_input_complete_limited_cached_short_page() -> None:
+    entry = entry_from(
+        SemanticQuery(metrics=[M_SUM], dimensions=[COL_A, COL_B], limit=10)
+    )
+    payload = SemanticResult(
+        requests=[],
+        results=pa.Table.from_pydict(
+            {
+                "a": ["x", "y", "z"],
+                "b": [1, 1, 1],
+                "sum_x": [1.0, 2.0, 3.0],
+            }
+        ),
+    )
+    assert _projection_input_complete(entry, payload) is True
+
+
+def test_projection_input_complete_limited_cached_full_page() -> None:
+    entry = entry_from(
+        SemanticQuery(metrics=[M_SUM], dimensions=[COL_A, COL_B], limit=3)
+    )
+    payload = SemanticResult(
+        requests=[],
+        results=pa.Table.from_pydict(
+            {
+                "a": ["x", "y", "z"],
+                "b": [1, 1, 1],
+                "sum_x": [1.0, 2.0, 3.0],
+            }
+        ),
+    )
+    assert _projection_input_complete(entry, payload) is False
 
 
 def test_projection_rejected_when_cached_has_having() -> None:
diff --git a/tests/unit_tests/semantic_layers/mapper_test.py 
b/tests/unit_tests/semantic_layers/mapper_test.py
index 7abdce91a6e..3ab64251ae8 100644
--- a/tests/unit_tests/semantic_layers/mapper_test.py
+++ b/tests/unit_tests/semantic_layers/mapper_test.py
@@ -1251,6 +1251,41 @@ def test_get_results_without_time_offsets(
 
     # Verify DataFrame matches main query result
     pd.testing.assert_frame_equal(result.df, main_df)
+    assert result.semantic_cache_hit is False
+
+
+def test_get_results_marks_semantic_cache_hit_from_requests(
+    mock_datasource: MagicMock,
+    mocker: MockerFixture,
+) -> None:
+    main_df = pd.DataFrame({"category": ["A"], "total_sales": [1.0]})
+    cached_result = SemanticResult(
+        requests=[
+            SemanticRequest(type="SQL", definition="SELECT ..."),
+            SemanticRequest(
+                type="cache",
+                definition=(
+                    "Served from semantic view smart cache (re-aggregated 
locally)"
+                ),
+            ),
+        ],
+        results=pa.Table.from_pandas(main_df),
+    )
+
+    mock_datasource.implementation.get_table = 
mocker.Mock(return_value=cached_result)
+
+    query_object = ValidatedQueryObject(
+        datasource=mock_datasource,
+        from_dttm=datetime(2025, 10, 15),
+        to_dttm=datetime(2025, 10, 22),
+        metrics=["total_sales"],
+        columns=["category"],
+        granularity="order_date",
+    )
+
+    result = get_results(query_object)
+
+    assert result.semantic_cache_hit is True
 
 
 def test_get_results_with_single_time_offset(

Reply via email to