hongkunxu opened a new pull request, #18544:
URL: https://github.com/apache/pinot/pull/18544

   ## Summary
   
   Adds Pinot-native SQL DDL for managing **Materialized Views** end-to-end, 
served through the existing `POST /sql/ddl` controller endpoint. Three new 
statements:
   
   ```sql
   CREATE MATERIALIZED VIEW [IF NOT EXISTS] [db.]name (
       col TYPE [DIMENSION|METRIC|DATETIME FORMAT '..' GRANULARITY '..']
           [NOT NULL|NULL] [DEFAULT lit], ...
   )
   [REFRESH EVERY '<period>']
   PROPERTIES ('timeColumnName' = 't', 'bucketTimePeriod' = '1d', ...)
   AS <Pinot SELECT>
   
   SHOW CREATE MATERIALIZED VIEW [db.]name        -- no TYPE clause (MV is 
always OFFLINE)
   DROP MATERIALIZED VIEW [IF EXISTS] [db.]name   -- no TYPE clause
   ```
   
   Before this PR, MVs could only be managed via the JSON `POST /tables` path — 
verbose, error-prone, and impossible to copy-paste between environments. This 
PR makes MVs a first-class SQL DDL surface with a deterministic round-trip: 
`CREATE` → ZK → `SHOW CREATE` produces a statement that re-parses to the same 
`TableConfig`.
   
   Labels: `feature`, `release-notes` (new SQL grammar / new controller error 
contracts), `testing`.
   
   ## Syntax reference
   
   ### `REFRESH EVERY '<period>'`
   
   Optional. When present, sets a per-MV cron under 
`task.MaterializedViewTask.schedule`. When omitted, no per-table cron is 
written and the MV runs under the cluster-wide `PinotTaskManager` schedule. 
`<period>` is a single integer followed by a unit char `m` / `h` / `d`:
   
   | Period       | Unit bound | Quartz cron emitted on-wire        | 
Round-trips back via `SHOW CREATE` |
   
|--------------|------------|------------------------------------|------------------------------------|
   | `'1m'`       | min N=1    | `0 * * * * ?`                      | yes       
                         |
   | `'Nm'`, 2–59 | min N≤59   | `0 0/N * * * ?`                    | yes       
                         |
   | `'1h'`       | hour N=1   | `0 0 * * * ?`                      | yes       
                         |
   | `'Nh'`, 2–23 | hour N≤23  | `0 0 0/N * * ?`                    | yes       
                         |
   | `'1d'`       | day N=1    | `0 0 0 * * ?`                      | yes       
                         |
   | `'Nd'`, 2–28 | day N≤28   | `0 0 0 1/N * ?` (day-of-month 1/N) | yes       
                         |
   
   Rejected at parse / compile time (HTTP 400):
   
   - `'60m'`, `'24h'`, `'29d'` and above — use the next-larger unit.
   - `'0d'` / `'-1m'` / `'1.5h'` — non-positive or non-integer.
   - `'1w'` / `'1y'` / `'30s'` / `'every Monday'` — unsupported unit.
   - Hand-written cron expressions — there is no DDL syntax for arbitrary cron 
in this PR. Operators needing irregular schedules use the legacy JSON API; 
`SHOW CREATE MATERIALIZED VIEW` on such an MV returns 400 rather than silently 
emitting an un-round-trippable DDL.
   
   ### MV-related `PROPERTIES`
   
   `PROPERTIES (...)` is a single flat string-to-string map. Keys are 
case-insensitive on input; canonical casing is restored on the wire. Routed by 
`MaterializedViewPropertyRouter` into table-config fields, the 
`task.MaterializedViewTask.*` block, or `customConfigs` depending on the rule 
below.
   
   #### Required
   
   | Key                | Type / format       | Default | Description |
   |--------------------|---------------------|---------|-------------|
   | `timeColumnName`   | identifier          | —       | **Required.** Name of 
the MV column the consistency manager uses for watermark progression. Must 
match a `DATETIME` column declared in the column list with PINOT's `TIMESTAMP` 
data type (the analyzer rejects `LONG` / `INT` time columns at create time). |
   | `bucketTimePeriod` | period string e.g. `'1h'`, `'1d'`, `'7d'` | — | 
**Required.** Width of the materialization window. Determines watermark advance 
step, the `DATETRUNC` unit allowed for the MV time-column expression in `AS 
<query>`, and per-task time-range filter. Promoted to 
`task.MaterializedViewTask.bucketTimePeriod`. Validated by 
`TimeUtils.convertPeriodToMillis` and must be `> 0`. |
   
   #### Optional MV task knobs (bare form preferred; 
`task.MaterializedViewTask.<key>` also accepted)
   
   | Key                       | Type    | Default                  | 
Description |
   
|---------------------------|---------|--------------------------|-------------|
   | `bufferTimePeriod`        | period  | empty (no buffer)        | Lag 
between "real-time wall clock" and the watermark when scheduling new APPEND 
windows. Use to avoid materializing windows that the source table is still 
ingesting into. Must be `≥ 0`. |
   | `maxNumRecordsPerSegment` | integer | `5000000`                | Max rows 
per generated MV segment. Smaller values produce more segments (better 
parallelism, more metadata pressure); larger values consolidate. |
   | `maxTasksPerBatch`        | integer | `4` (hard cap `1000`)    | How many 
APPEND windows the scheduler dispatches per generator cycle. Raise to back-fill 
historical data faster (`N/maxTasksPerBatch` cycles instead of `N`). |
   | `stalenessThresholdMs`    | long ms | `0` (SLO disabled)       | Per-MV 
freshness SLO. When `(now - watermarkMs) > stalenessThresholdMs`, the broker 
excludes this MV from query-rewrite and falls back to the base table. `0` means 
no SLO check (any non-zero watermark is acceptable). |
   | `taskMode`                | string  | scheduler-managed        | 
**Reserved.** The runtime task mode (`APPEND` / `OVERWRITE` / `DELETE`) is 
assigned per task by the scheduler; setting it in PROPERTIES is accepted by the 
grammar but has no effect on scheduling. Kept on the allow-list so existing 
JSON-API tables that set it round-trip through `SHOW CREATE` without warnings. |
   
   #### Optional table-level promoted knobs
   
   | Key             | Type       | Default                                   | 
Description |
   
|-----------------|------------|-------------------------------------------|-------------|
   | `replication`   | integer    | cluster default (typically `1`)           | 
Number of OFFLINE replicas for the MV's segments. |
   | `brokerTenant`  | identifier | `DefaultTenant`                           | 
Broker tenant the MV is served from. |
   | `serverTenant`  | identifier | `DefaultTenant`                           | 
Server tenant the MV's segments are placed on. |
   | `timeType`      | string     | inferred from the `DATETIME` column FORMAT| 
Almost never needed — the column-level `FORMAT '..'` already conveys this. 
Accepted for parity with `CREATE TABLE`. |
   
   #### Forward-compat: extra task knobs
   
   | Pattern                       | Routing |
   |-------------------------------|---------|
   | `task.<otherTaskType>.<key>`  | Stored verbatim under 
`TableTaskConfig.taskTypeConfigsMap[otherTaskType]`. A future minion task type 
composed onto an MV just works without grammar changes. |
   | Any other unknown key         | Stored under 
`TableCustomConfig.customConfigs` — visible via `SHOW CREATE MV` for 
inspection, ignored by Pinot core. |
   
   #### Rejected (HTTP 400 at compile time)
   
   | Key                                         | Why |
   |---------------------------------------------|-----|
   | `schedule` (bare or `task.MaterializedViewTask.schedule`) | Reserved; use 
`REFRESH EVERY '<period>'`. |
   | `definedSQL` (bare or `task.MaterializedViewTask.definedSQL`) | Reserved; 
written from the `AS <query>` clause. |
   | `streamType`, `stream.*`, `realtime.*`      | MV is always OFFLINE. |
   | `tableType`, `tableName`, `ifNotExists`     | Reserved DDL-clause names. |
   
   ## Examples
   
   ### 1. Minimal — defaults everywhere, no explicit schedule
   
   Runs under the cluster-wide MV cron; default `maxNumRecordsPerSegment`, 
default `maxTasksPerBatch`, no staleness SLO, no buffer.
   
   ```sql
   CREATE MATERIALIZED VIEW clickstream_daily_event_counts (
       event_day  TIMESTAMP NOT NULL,
       event_name STRING    NOT NULL,
       cnt        LONG      NOT NULL
   )
   PROPERTIES (
       'timeColumnName'   = 'event_day',
       'bucketTimePeriod' = '1d'
   )
   AS
   SELECT DATETRUNC('DAY', ts * 1000) AS event_day,
          event_name,
          COUNT(*)                    AS cnt
   FROM clickstreamFunnel
   GROUP BY event_day, event_name;
   ```
   
   ### 2. Sub-hour refresh + bounded staleness SLO
   
   `REFRESH EVERY '1m'` → `0 * * * * ?` cron. The 60-second 
`stalenessThresholdMs` makes the broker refuse to rewrite onto this MV if its 
watermark falls more than a minute behind wall clock — important for 
near-real-time dashboards where a stale MV is worse than the base table.
   
   ```sql
   CREATE MATERIALIZED VIEW IF NOT EXISTS analytics.clickstream_minute_dau (
       minute_ts TIMESTAMP NOT NULL,
       dau       LONG      NOT NULL
   )
   REFRESH EVERY '1m'
   PROPERTIES (
       'timeColumnName'        = 'minute_ts',
       'bucketTimePeriod'      = '1m',
       'bufferTimePeriod'      = '30s',
       'stalenessThresholdMs'  = '60000'
   )
   AS
   SELECT DATETRUNC('MINUTE', ts * 1000) AS minute_ts,
          COUNT(*)                       AS dau
   FROM clickstreamFunnel
   GROUP BY minute_ts;
   ```
   
   ### 3. Heavy daily roll-up with aggressive back-fill and segment sizing
   
   `maxTasksPerBatch=16` lets onboarding back-fill complete ~4× faster than the 
default. `maxNumRecordsPerSegment=2000000` chops large windows into smaller 
segments for better scan parallelism.
   
   ```sql
   CREATE MATERIALIZED VIEW analytics.clickstream_daily_funnel (
       event_day      TIMESTAMP NOT NULL,
       event_name     STRING    NOT NULL,
       user_id        LONG      NOT NULL,
       event_count    LONG      NOT NULL,
       total_dwell_ms LONG      NOT NULL
   )
   REFRESH EVERY '15m'
   PROPERTIES (
       'timeColumnName'          = 'event_day',
       'bucketTimePeriod'        = '1d',
       'bufferTimePeriod'        = '1h',
       'maxNumRecordsPerSegment' = '2000000',
       'maxTasksPerBatch'        = '16',
       'replication'             = '2'
   )
   AS
   SELECT DATETRUNC('DAY', ts * 1000) AS event_day,
          event_name,
          user_id,
          COUNT(*)                    AS event_count,
          SUM(dwell_ms)               AS total_dwell_ms
   FROM clickstreamFunnel
   GROUP BY event_day, event_name, user_id;
   ```
   
   ### 4. Inspect / re-apply with `SHOW CREATE`
   
   `SHOW CREATE MATERIALIZED VIEW` emits a statement that re-parses to the same 
`TableConfig`. Useful for GitOps / config drift detection.
   
   ```sql
   SHOW CREATE MATERIALIZED VIEW analytics.clickstream_daily_funnel;
   ```
   
   Returns (pretty-printed):
   
   ```sql
   CREATE MATERIALIZED VIEW analytics.clickstream_daily_funnel (
       event_day      TIMESTAMP NOT NULL,
       event_name     STRING    NOT NULL,
       user_id        LONG      NOT NULL,
       event_count    LONG      NOT NULL,
       total_dwell_ms LONG      NOT NULL
   )
   REFRESH EVERY '15m'
   PROPERTIES (
       'timeColumnName'          = 'event_day',
       'bucketTimePeriod'        = '1d',
       'bufferTimePeriod'        = '1h',
       'maxNumRecordsPerSegment' = '2000000',
       'maxTasksPerBatch'        = '16',
       'replication'             = '2'
   )
   AS
   SELECT DATETRUNC('DAY', ts * 1000) AS event_day,
          event_name,
          user_id,
          COUNT(*)                    AS event_count,
          SUM(dwell_ms)               AS total_dwell_ms
   FROM clickstreamFunnel
   GROUP BY event_day, event_name, user_id;
   ```
   
   ### 5. Idempotent teardown
   
   ```sql
   DROP MATERIALIZED VIEW IF EXISTS analytics.clickstream_daily_funnel;
   ```
   
   `IF EXISTS` swallows a missing MV (returns 200 no-op). It does **not** 
swallow a type mismatch: if the name resolves to a plain OFFLINE table, the 
call returns 400 pointing at `DROP TABLE` — silently no-op'ing on type 
confusion would let a provisioning script believe it cleaned up an MV while a 
real table is still standing.
   
   ### 6. POSTing DDL to the controller
   
   The UI Query Console talks to `/sql` (broker proxy), which now explicitly 
rejects DDL. Operators currently post DDL straight to `/sql/ddl`:
   
   ```bash
   curl -sS -u "$PINOT_USER:$PINOT_PASS" \
        -H 'Content-Type: application/json' \
        -d "{\"sql\": \"$(< create_mv.sql sed 's/"/\\"/g' | tr -d '\n')\"}" \
        "$PINOT_CONTROLLER/sql/ddl"
   ```
   
   Wiring DDL into the UI is tracked as a follow-up.
   
   ## Motivation
   
   - MV authoring/inspection/teardown via JSON is the single biggest UX gap 
remaining around the MV feature shipped earlier. Operators copy-paste 50-line 
JSON blobs and routinely mis-configure `timeColumnName` / `bucketTimePeriod` / 
cron expressions.
   - Plain tables already have `CREATE TABLE` / `SHOW CREATE TABLE` / `DROP 
TABLE` on `/sql/ddl`. MVs being the lone exception forces operators to 
context-switch between two management styles for the same cluster.
   - "Reverse DDL" (`SHOW CREATE`) is required for any GitOps / config-drift 
detection workflow on MVs.
   
   ## Forward path (DDL → ZK)
   
   **Grammar** (`pinot-common/src/main/codegen/includes/parserImpls.ftl`):
   
   - New AST nodes: `SqlPinotCreateMaterializedView`, 
`SqlPinotShowCreateMaterializedView`, `SqlPinotDropMaterializedView`, 
`SqlPinotRefreshClause`.
   - `SqlPinotShow` and `SqlPinotDrop` unified into single entry points that 
branch internally on `TABLE` vs `MATERIALIZED VIEW`. Keeps JavaCC choice 
unambiguous under one leading keyword and avoids a parser-wide LL(*) lookahead 
explosion.
   
   **Compile** (`pinot-sql-ddl/.../compile/`):
   
   - `DdlOperation` gains `CREATE_MATERIALIZED_VIEW` / 
`SHOW_CREATE_MATERIALIZED_VIEW` / `DROP_MATERIALIZED_VIEW`.
   - New `CompiledCreate/ShowCreate/DropMaterializedView` mirror the table 
variants but omit `tableType` (MV is always OFFLINE — the consistency manager 
doesn't yet receive realtime segment commit notifications, so an MV over a 
REALTIME source would silently miss STALE marking).
   - `MaterializedViewPropertyRouter`:
     - Translates `REFRESH EVERY '<period>'` into the exact Quartz cron 
expression and stores under `task.MaterializedViewTask.schedule`.
     - Flattens canonical MV task knobs (`bucketTimePeriod`, 
`bufferTimePeriod`, `maxNumRecordsPerSegment`, `maxTasksPerBatch`, `taskMode`, 
`stalenessThresholdMs`) from bare DDL form into the MV task block with 
canonical casing.
     - Routes everything else through the shared `PropertyMapping` (so e.g. 
retention/tenant/index knobs work identically to `CREATE TABLE`).
     - Rejects `schedule` / `definedSQL` in `PROPERTIES` (reserved for `REFRESH 
EVERY` / `AS <query>` clauses).
   - `AS <query>` is preserved as the **raw user-typed substring** 
(parser-position span, with full-span recovery for top-level `SqlOrderBy` / 
`SqlWith` / `SqlExplain` wrappers — caught by regressions for `LIMIT` and 
top-level `ORDER BY`), self-verified parseable standalone, and stored under 
`task.MaterializedViewTask.definedSQL`.
   - `REFRESH EVERY` is optional: omitting it leaves no per-table `schedule` 
key so `PinotTaskManager` falls back to the cluster-wide MV cron.
   
   ## Reverse path (TableConfig → DDL)
   
   - `CanonicalDdlEmitter` dispatches on the MV marker (presence of 
`task.MaterializedViewTask.definedSQL`) and emits `CREATE MATERIALIZED VIEW`.
   - `REFRESH EVERY '<period>'` is emitted only when the stored cron 
round-trips via `MaterializedViewPropertyRouter.cronToPeriod`. **Non-standard 
cron expressions are explicitly rejected with HTTP 400** rather than emitted as 
opaque task properties — silently emitting an un-round-trippable DDL would 
break copy-paste-reapply workflows. Operators with hand-crafted cron 
expressions get a clear pointer back to the JSON API.
   - `MaterializedViewPropertyExtractor`:
     - Filters synthetic keys (`definedSQL` / `schedule`, both surfaced by 
dedicated DDL clauses).
     - Flattens canonical knobs back to bare DDL form so `emit → parse → emit` 
produces identical canonical text.
     - Keeps unknown `task.<taskType>.<key>` entries verbatim under their 
prefix (forward-compat).
   
   ## Controller (`PinotDdlRestletResource`)
   
   - **`executeCreateMaterializedView`**: REUSES the regular table CREATE 
pipeline (validate → addSchema → addTable) but routes through the MV-specific 
compile branch first.
     - Pre-existing schemas with the same column shape are reused (idempotent 
retry); mismatched shapes → 409.
     - Schema-race recovery: `SchemaAlreadyExistsException` caught, racing 
schema fetched + re-validated, then `addTable` proceeds.
     - Dry-run validates without persisting (returns the full would-be 
`TableConfig` + `Schema`).
     - Authz: `AccessType.CREATE` + `Actions.Table.CREATE_TABLE` (reuses the 
existing permission — introducing a new `CREATE_MATERIALIZED_VIEW` action would 
silently lock operators with `CREATE_TABLE` out of MV creation on rolling 
upgrade).
   - **`executeShowCreateMaterializedView`**: looks up the `_OFFLINE` variant 
directly (no dual-variant handshake needed), authorizes via `GET_TABLE_CONFIG`, 
emits via `CanonicalDdlEmitter`.
   - **`executeDropMaterializedView`**: delegates cleanup entirely to 
`PinotHelixResourceManager.deleteTable`, which already removes MV definition + 
runtime znodes (`MaterializedViewDefinitionMetadataUtils` / 
`RuntimeMetadataUtils`) and unregisters from 
`MaterializedViewConsistencyManager` — the same code path the legacy `DELETE 
/materializedViews/{name}` endpoint uses. Reuses 
`assertNoLogicalTableReferences` and `assertNoActiveTasksBeforeDrop` so the 
safety guards stay in lockstep with `DROP TABLE`.
   
   ## Strict TABLE / MATERIALIZED VIEW partitioning at the REST boundary
   
   The "Q2=B" contract — enforced symmetrically across all three verbs:
   
   | Verb on wrong target                                  | HTTP        | 
Error message |
   
|-------------------------------------------------------|-------------|---------------|
   | `SHOW CREATE TABLE` on an MV                          | **400**     | *"… 
is a materialized view. Use 'SHOW CREATE MATERIALIZED VIEW …'"* |
   | `SHOW CREATE MATERIALIZED VIEW` on a plain table      | **400**     | *"… 
is not a materialized view. Use 'SHOW CREATE TABLE …'"* |
   | `DROP TABLE` on an MV                                 | **400**     | *"… 
is a materialized view. Use 'DROP MATERIALIZED VIEW …'"* |
   | `DROP MATERIALIZED VIEW` on a plain table             | **400**     | *"… 
is not a materialized view. Use 'DROP TABLE …'"* |
   | `DROP MATERIALIZED VIEW IF EXISTS` on a plain table   | **still 400** | 
`IF EXISTS` is about absence, not type confusion. |
   
   The MV-specific `SHOW` / `DROP` forms intentionally have **no `TYPE` 
clause**; a trailing `TYPE OFFLINE` fails parsing rather than being silently 
accepted.
   
   ## Live validation against a local cluster (and two bugs caught)
   
   Tested end-to-end against a local Pinot cluster with real production-style 
MVs (`airlineStats_mv`, `orders_daily_mv`) plus three freshly DDL-created MVs 
over `clickstreamFunnel`. Two latent bugs surfaced and were fixed in the same 
commit:
   
   1. **TreeMap iterator value corruption in 
`MaterializedViewPropertyExtractor`** (data loss):
      - `TreeMap.deleteEntry()` for an internal node with two children mutates 
the deleted entry's `key`/`value` fields with its in-order successor's payload 
before unlinking the successor. The `Map.Entry` returned by the iterator is the 
same node — reading `entry.getValue()` after `it.remove()` returned the 
**successor's** value.
      - Visible symptom: `SHOW CREATE MATERIALIZED VIEW orders_daily_mv` 
emitted `'maxNumRecordsPerSegment' = 'dayMillis'` (should have been `'10000'` — 
`'dayMillis'` was the value of the lexicographically next key `timeColumnName`).
      - Fix: snapshot `entry.getValue()` before `it.remove()`.
   2. **Trailing `;` doubling on `AS <query>` emission**:
      - Legacy MV configs sometimes stored `definedSQL` with a trailing `;`. 
Emitter unconditionally appended its own `;\n`, producing `;;\n` which fails 
strict re-parse.
      - Fix: `stripTrailingSemicolons` normalizes any combination of trailing 
whitespace + `;` to exactly one `;`.
   
   Both bugs have golden regression tests 
(`canonicalKnobValuesSurviveTreeMapDeleteRebalance`, 
`trailingSemicolonInDefinedSqlNotDoubledOnEmit`, `mult


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to