hongkunxu opened a new pull request, #18529: URL: https://github.com/apache/pinot/pull/18529
# Part 2 — Materialized View: Query Rewrite > **Depends on:** _<link to Part 1 PR>_ — the metadata model, consistency manager, analyzer, and Minion task framework. This PR will not compile or run without it. ## Summary This PR adds the broker-side query rewrite half of Apache Pinot's Materialized View (MV) feature for the Single-Stage Query Engine (SSE). Once an MV is built and kept fresh by the Part 1 framework, this PR lets the broker **transparently rewrite** a user's query against the base table to an equivalent query against the MV — no SQL change from the user. **Key capabilities:** - Transparent query rewrite at the broker layer — no SQL changes required from users - Per-MV opt-in flag (`rewriteEnabled`) and per-MV staleness SLO (`stalenessThresholdMs`) — both stored on the MV definition (extends Part 1's `MaterializedViewDefinitionMetadata`) - Pluggable subsumption strategies for exact match, projection subset, and aggregation rollup - Hybrid execution mode that splits queries across MV (historical, `ts < watermarkMs`) and base table (real-time, `ts >= watermarkMs`) with automatic result merging - Master broker switch (`pinot.broker.query.enable.materialized.view.rewrite=false` by default) — upgrades are safe; no behavior change until an operator opts in - New `materializedViewQueried` field on `BrokerResponseNative` so operators can see which MV served each query ## Design Doc https://docs.google.com/document/d/1ToJfN42IMNySEY8YODb99Beis9YpLa8A8OWLcPcvG0M/edit?usp=sharing ## Architecture (rewrite path) The diagram in the Companion PR (Part 1) shows the full end-to-end system. This PR delivers the broker-side pieces: `MaterializedViewQueryRewriteEngine`, `MaterializedViewMetadataCache`, the eligibility gate, the strategy chain, and the FULL / SPLIT execution-mode branching. ## Key Components | Component | Module | Description | |---|---|---| | `MaterializedViewHandler` (interface) + `DefaultMaterializedViewHandler` | `pinot-materialized-view` | Pluggable broker-side rewrite SPI loaded reflectively by `loadHandler(...)` | | `MaterializedViewQueryRewriteEngine` | `pinot-materialized-view` | Walks subsumption strategies, picks lowest-cost plan; runs the eligibility gate (`rewriteEnabled` + `watermarkMs > 0` + staleness SLO) | | `MaterializedViewMetadataCache` | `pinot-materialized-view` | Broker-side reverse index from raw base-table name to MV cache entries; pre-computes `viewProjectionMap` + flattened-AND filter conjuncts; double-ZK-listener (definition path infrequent, runtime path per-task) | | `ExactSubsumptionStrategy` | `pinot-materialized-view` | Matches queries identical to the MV definition (cost ≈ 1) | | `ScanSubsumptionStrategy` | `pinot-materialized-view` | Matches scan queries whose projection is a subset of the MV (cost ≈ 2) | | `AggregationSubsumptionStrategy` | `pinot-materialized-view` | Matches aggregation queries via re-aggregation over MV columns (cost ≈ 3) | | `AggregationEquivalenceRegistry` | `pinot-materialized-view` | Maps base aggregations (SUM, MIN, MAX, COUNT, HLL/HLL+/Theta) to MV-side re-aggregation; the Part 1 analyzer is extended to reject MVs whose aggregations are not registered here | ## Configuration Reference ### Broker config | Key | Default | Description | |---|---|---| | `pinot.broker.query.enable.materialized.view.rewrite` | `false` | Master switch for MV rewrite. Required to enable broker-side rewrites; default-off makes upgrades safe. | | `pinot.broker.materialized.view.handler.class` | `DefaultMaterializedViewHandler` | Pluggable handler class loaded via `Class.forName`. | ### Per-MV opt-in (no per-query option) V2 moves opt-in from a per-query `useMaterializedView` option to a per-MV-table flag on the MV's `materializedViewConfig`. These two fields are added by this PR to the existing `MaterializedViewDefinitionMetadata` (Part 1): | Field | Default | Description | |---|---|---| | `rewriteEnabled` | `true` | Operator kill switch — set to `false` to keep ingestion running while temporarily routing all queries to the base table (e.g. during MV migration). | | `stalenessThresholdMs` | `0` (no SLO) | Broker excludes the MV from rewrite when `now − watermarkMs > stalenessThresholdMs`. Set this to bound the maximum age of MV-served data. | ## Execution Modes - **`FULL_REWRITE`** — the entire query is satisfiable from the MV; broker routes to the MV table only. - **`SPLIT_REWRITE`** — the query window crosses `watermarkMs`. Broker fires two parallel sub-queries: - MV side: `ts < watermarkMs` - Base side: `ts >= watermarkMs` - Results are merged at the broker (`BrokerReduceService` MV branch). V1 uses a single split point (`watermarkMs`). Per-bucket N-way routing — using the full `partitions` map for fine-grained MV/base interleaving — is deferred to V2; the persistent state shape from Part 1 is already forward-compatible. ## Eligibility Gate For each query against a base table that has a registered MV, the broker evaluates in order: 1. Master broker switch enabled 2. MV's `rewriteEnabled = true` 3. MV's `watermarkMs > 0` (at least one VALID partition exists) 4. `now − watermarkMs <= stalenessThresholdMs` (when `stalenessThresholdMs > 0`) 5. Strategy chain finds a matching subsumption rule If any check fails, the query is served from the base table unchanged. ## Quick Start ``` bin/pinot-admin.sh QuickStart -type MATERIALIZED_VIEW ``` (Same quickstart as Part 1, plus broker-side rewrite is now active.) The following query hits `airlineStats` but the broker transparently rewrites it to `airlineStatsMv`: ```sql SELECT Carrier, SUM(ArrDelay) AS total_delay, COUNT(*) AS flights FROM airlineStats GROUP BY Carrier ORDER BY total_delay DESC LIMIT 10; ``` The response includes `"materializedViewQueried": "airlineStatsMv_OFFLINE"` so operators can see which MV served each query. ## Query Rewrite Examples ### 1. ExactSubsumption — scan projection Hits the scan MV. Query exactly matches the scan MV definition. SELECT DaysSinceEpoch, Carrier, Origin, Dest, DestCityName FROM airlineStats ### 2. ScanSubsumption — projection subset with residual filter Hits the scan MV. User projection ⊂ MV projection; `WHERE Dest = 'IAH'` becomes a residual filter on the MV. SELECT DaysSinceEpoch, Origin, Dest, DestCityName FROM airlineStats WHERE Dest = 'IAH' ### 3. ExactSubsumption — aggregation query Hits the aggregation MV. Query exactly matches MV definition (same GROUP BY + same aggregations). SELECT DaysSinceEpoch, Carrier, Origin, Dest, DestCityName, SUM(ArrDelayMinutes) AS ArrDelayMinutes_sum, SUM(Cancelled) AS Cancelled_sum FROM airlineStats GROUP BY DaysSinceEpoch, Carrier, Origin, Dest, DestCityName ### 4. AggSubsumption — re-aggregation with fewer GROUP BY keys Hits the aggregation MV. User GROUP BY ⊂ MV GROUP BY (drops `DestCityName`); rewritten to `SUM(ArrDelayMinutes_sum)` over MV. SELECT DaysSinceEpoch, Carrier, Origin, Dest, SUM(ArrDelayMinutes) AS ArrDelayMinutes_sum FROM airlineStats GROUP BY DaysSinceEpoch, Origin, Carrier, Dest ### 5. AggSubsumption + SketchMergeEquivalence — HLL rewrite Hits the HLL MV. `DISTINCTCOUNTHLL(TailNum)` on base table → sketch merge over MV's `DISTINCTCOUNTRAWHLL` column `hll_tailnum`. SELECT Origin, Dest, DISTINCTCOUNTHLL(TailNum) AS hll_tailnum FROM airlineStats GROUP BY Origin, Dest ## Common Errors | Error message (excerpt) | When | How to fix | |---|---|---| | `aggregation '<X>' for which no re-aggregation rule is registered` | At MV create time: an MV uses an aggregation that the broker would have no way to re-aggregate at query time | Use SUM/MIN/MAX/COUNT or one of the registered sketch families (HLL, HLL+, Theta). AVG / PERCENTILE etc. are not supported. | (For ingestion-time errors — bucketTimePeriod, LIMIT, OFFSET, nested SELECT, mutable source, etc. — see the Companion PR.) ## Operational Notes - **Upgrade order (full feature):** controller → minion (Part 1) → broker (this PR). The broker-side rewrite defaults OFF (`pinot.broker.query.enable.materialized.view.rewrite=false`); existing clusters see no behavior change after upgrade until an operator opts in. - **Wire-format additions** (backward-compatible — old readers ignore unknown fields): - New fields on `MaterializedViewDefinitionMetadata`: `rewriteEnabled`, `stalenessThresholdMs`. - New `BrokerResponseNative` field `materializedViewQueried` (Jackson `@JsonInclude(NON_NULL)` so absent when unused). ## Testing - **Unit tests**: subsumption strategies (`ExactSubsumptionStrategyTest`, `ScanSubsumptionStrategyTest`, `AggregationSubsumptionStrategyTest`), query rewrite engine (`MaterializedViewQueryRewriteEngineTest`), broker metadata cache (`MaterializedViewMetadataCacheTest`), broker reduce merge for SPLIT (`BrokerReduceServiceTest`), aggregation equivalence registry (`AggregationEquivalenceRegistryTest`), MV-analyzer extension (aggregation eligibility cases added to `MaterializedViewAnalyzerTest`). - **Integration test**: rewrite half of `MaterializedViewClusterIntegrationTest` — end-to-end coverage of query rewrite (FULL and SPLIT modes), `rewriteEnabled` kill switch, staleness SLO eviction, and the `materializedViewQueried` response field. ## Modules Affected - `pinot-broker` — Broker factories for MV-aware single-stage / gRPC handlers - `pinot-common` — `BrokerResponseNative.materializedViewQueried` field - `pinot-core` — Broker reduce branch for MV-rewritten queries - `pinot-materialized-view` — Query rewrite engine, handler SPI, subsumption strategies, aggregation equivalence registry; extends Part 1's `MaterializedViewDefinitionMetadata` with `rewriteEnabled` / `stalenessThresholdMs`; extends `MaterializedViewAnalyzer` with the aggregation-eligibility check - `pinot-spi` — Config-key constants (`pinot.broker.query.enable.materialized.view.rewrite`, `pinot.broker.materialized.view.handler.*`) ## Limitations / V2 Defer - Only applicable to the SSE engine; MSE rewrite is out of scope. - N-way per-bucket broker routing (use the full `partitions` map for fine-grained MV/base interleaving instead of the single `watermarkMs` split point) — deferred to V2 once V1 has production telemetry. Persistent state shape is already forward-compatible. - AVG / PERCENTILE / other non-distributive aggregations not supported (analyzer rejects MVs that use them; only SUM/MIN/MAX/COUNT and HLL/HLL+/Theta sketches have re-aggregation rules). - No per-query opt-in / override — feature is governed entirely by the broker master switch + per-MV `rewriteEnabled`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
