This is an automated email from the ASF dual-hosted git repository. xiangfu0 pushed a commit to branch mse-mv-ddl-extensibility in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 83c9a1fda7e7ffbd2b0762da217c5f3868027fad Author: Xiang Fu <[email protected]> AuthorDate: Sat May 30 00:26:11 2026 -0700 Make CREATE MATERIALIZED VIEW DDL pluggable via MaterializedViewDdlHandler OSS Pinot's CREATE MATERIALIZED VIEW is single-stage-engine only: a JOIN in the AS <query> clause is rejected, and the MV is always routed under the MaterializedViewTask task type. Downstream distributions that materialize an MV via a different engine/task type (e.g. a multi-stage-engine generator that supports JOINs) previously had no clean way to override this. Introduce a MaterializedViewDdlHandler SPI that owns the MV-specific compile decisions — query validation (join policy) and task-config routing (which task type to stamp). DefaultMaterializedViewDdlHandler preserves today's behavior exactly (reject JOINs, route under MaterializedViewTask), and is the default in MaterializedViewDdlHandlerRegistry. DdlCompiler.compileCreateMaterializedView delegates to the registered handler and validates MV consistency against the task type the handler stamped. Also: - MaterializedViewPropertyRouter.apply gains a taskType-parameterized overload so a handler can route the MV task config under an alternative task type. - verifyDefinedSqlIsParseable now does a syntactic parse (compileToSqlNodeAndOptions) instead of full single-stage compilation (compileToPinotQuery), so the slicing-bug guard accepts multi-stage shapes (e.g. JOINs) a handler may permit; engine-specific validity is still enforced later by the MV analyzer / task generator. Behavior is byte-for-byte unchanged when no alternative handler is registered. All existing pinot-sql-ddl tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]> --- .../apache/pinot/sql/ddl/compile/DdlCompiler.java | 78 +++++++----------- .../compile/DefaultMaterializedViewDdlHandler.java | 52 ++++++++++++ .../ddl/compile/MaterializedViewDdlHandler.java | 93 ++++++++++++++++++++++ .../MaterializedViewDdlHandlerRegistry.java | 43 ++++++++++ .../compile/MaterializedViewPropertyRouter.java | 12 ++- 5 files changed, 227 insertions(+), 51 deletions(-) diff --git a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java index 61bd20c45ae..df6b21031a0 100644 --- a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java +++ b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java @@ -30,7 +30,6 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.sql.SqlBasicTypeNameSpec; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlIdentifier; -import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlLiteral; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; @@ -526,9 +525,12 @@ public final class DdlCompiler { List<String> warnings = new ArrayList<>(); Map<String, String> properties = resolveProperties(node.getProperties().getList()); - // Reject JOIN early so the inferer's single-source assumption (and the analyzer's - // downstream check) cannot be violated by a definedSql we haven't validated yet. - rejectJoinInDefinedSql(node.getQuery()); + // Validate the AS-clause query via the registered handler before column resolution / schema + // inference. The default handler rejects JOINs (single-source SSE MVs); an alternative handler + // (e.g. a multi-stage-engine MV) may accept them. Done early so the single-source inferer's + // assumption cannot be violated by a definedSql we haven't validated yet. + MaterializedViewDdlHandler mvHandler = MaterializedViewDdlHandlerRegistry.getHandler(); + mvHandler.validateDefinedQuery(node.getQuery(), properties); String definedSql = extractDefinedSql(originalSql, node.getQuery()); verifyDefinedSqlIsParseable(definedSql); @@ -539,6 +541,14 @@ public final class DdlCompiler { // surfaces a clear message when it's absent. List<ResolvedColumnDefinition> columns; if (node.getColumns().getList().isEmpty()) { + // Schema inference from the AS <query> projection is single-source-only. A handler that + // accepted a JOIN above (e.g. multi-stage-engine MV) must require an explicit column list, + // because the inferer cannot resolve a multi-source projection. + if (MaterializedViewDdlHandler.containsJoin(node.getQuery())) { + throw new DdlCompilationException( + "CREATE MATERIALIZED VIEW with a JOIN in the AS clause requires an explicit column " + + "list; schema inference is only supported for single-source materialized views."); + } // Fall back to the request-header database when the DDL itself does not qualify the // MV name — Calcite needs SOME database to resolve `FROM src` against, and the // header's intent ("operate on database X") matches where the MV will be created. @@ -598,10 +608,12 @@ public final class DdlCompiler { TableConfigBuilder builder = new TableConfigBuilder(TableType.OFFLINE) .setTableName(tableNameForConfig) .setIsMaterializedView(true); - MaterializedViewPropertyRouter.apply(properties, definedSql, schedule, builder); + // The handler routes the MV properties onto the builder and returns the minion task type it + // stamped (default: MaterializedViewTask). The consistency check below uses that task type. + String mvTaskType = mvHandler.applyTaskConfig(properties, definedSql, schedule, builder); TableConfig tableConfig = builder.build(); - validateMaterializedViewConsistency(schema, tableConfig); + validateMaterializedViewConsistency(schema, tableConfig, mvTaskType); return new CompiledCreateMaterializedView(resolved.getDatabaseName(), schema, tableConfig, node.isIfNotExists(), warnings); @@ -734,54 +746,19 @@ public final class DdlCompiler { + sql.length() + "."); } - /// Walks the parsed AS-clause AST and throws a clear, MV-specific error if any JOIN node - /// is found at any depth (top-level FROM, subquery FROM, lateral join, etc.). See the - /// call-site comment in [#compileCreateMaterializedView] for *why* we do this against the - /// AST rather than letting the downstream re-parse / analyzer surface the limitation. - private static void rejectJoinInDefinedSql(SqlNode queryNode) { - if (containsJoin(queryNode)) { - throw new DdlCompilationException( - "CREATE MATERIALIZED VIEW does not support JOIN in the AS clause. " - + "Materialized views currently read from a single source table; " - + "pre-join the inputs into a base table and reference that single table " - + "in the MV definition."); - } - } - - /// Returns true iff `node` or any descendant is a [SqlKind#JOIN] call. We walk via the - /// `SqlCall#getOperandList` / `SqlNodeList` axes so the traversal stays wrapper-agnostic - /// (SqlOrderBy, SqlWith, SqlExplain) — mirroring how [#collectPositions] walks the tree. - /// Leaves (SqlLiteral, SqlIdentifier, ...) terminate the recursion naturally. - private static boolean containsJoin(@Nullable SqlNode node) { - if (node == null) { - return false; - } - if (node.getKind() == SqlKind.JOIN) { - return true; - } - if (node instanceof SqlCall) { - for (SqlNode child : ((SqlCall) node).getOperandList()) { - if (containsJoin(child)) { - return true; - } - } - } else if (node instanceof SqlNodeList) { - for (SqlNode child : (SqlNodeList) node) { - if (containsJoin(child)) { - return true; - } - } - } - return false; - } - /// Sanity check: the substring we extracted must be a standalone parseable Pinot query. /// This guards against DDL-layer slicing bugs (off-by-one in [#lineColToOffset], parser /// position quirks) so the error surfaces in the DDL layer rather than at first scheduler /// tick or at create-time analysis (PR 3). + /// + /// Uses a syntactic parse (`compileToSqlNodeAndOptions`) rather than full single-stage + /// compilation (`compileToPinotQuery`): the slicing-bug guard only needs to confirm the extracted + /// text is a well-formed query, and the syntactic parse accepts multi-stage shapes (e.g. JOINs) + /// that a [MaterializedViewDdlHandler] may permit. Engine-specific validity is enforced later by + /// the MV analyzer / task generator for whichever task type the handler stamped. private static void verifyDefinedSqlIsParseable(String definedSql) { try { - CalciteSqlParser.compileToPinotQuery(definedSql); + CalciteSqlParser.compileToSqlNodeAndOptions(definedSql); } catch (Exception e) { throw new DdlCompilationException( "AS <query> did not re-parse as a Pinot query: " + e.getMessage() @@ -791,7 +768,8 @@ public final class DdlCompiler { /// Cross-checks: `timeColumnName` must reference a declared DATETIME column, and /// `bucketTimePeriod` must be present so the scheduler has a window size. - private static void validateMaterializedViewConsistency(Schema schema, TableConfig tableConfig) { + private static void validateMaterializedViewConsistency(Schema schema, TableConfig tableConfig, + String mvTaskType) { String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); if (timeColumnName == null || timeColumnName.isEmpty()) { throw new DdlCompilationException( @@ -809,7 +787,7 @@ public final class DdlCompiler { } Map<String, String> mvTaskConfig = tableConfig.getTaskConfig() == null ? null - : tableConfig.getTaskConfig().getConfigsForTaskType(MaterializedViewTask.TASK_TYPE); + : tableConfig.getTaskConfig().getConfigsForTaskType(mvTaskType); if (mvTaskConfig == null || !mvTaskConfig.containsKey(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY)) { throw new DdlCompilationException( diff --git a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java new file mode 100644 index 00000000000..7b49d7c0832 --- /dev/null +++ b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.sql.ddl.compile; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.calcite.sql.SqlNode; +import org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; + + +/// Default, single-stage-engine (SSE) materialized-view DDL handler. +/// +/// A materialized view reads from exactly one source table: a JOIN in the `AS <query>` clause is +/// rejected, and the MV task configuration is routed under the built-in `MaterializedViewTask` task +/// type. This is the behavior of OSS Pinot when no alternative handler is registered. +public class DefaultMaterializedViewDdlHandler implements MaterializedViewDdlHandler { + + @Override + public void validateDefinedQuery(SqlNode queryNode, Map<String, String> properties) { + if (MaterializedViewDdlHandler.containsJoin(queryNode)) { + throw new DdlCompilationException( + "CREATE MATERIALIZED VIEW does not support JOIN in the AS clause. " + + "Materialized views currently read from a single source table; " + + "pre-join the inputs into a base table and reference that single table " + + "in the MV definition."); + } + } + + @Override + public String applyTaskConfig(Map<String, String> properties, String definedSql, + @Nullable String schedule, TableConfigBuilder builder) { + MaterializedViewPropertyRouter.apply(properties, definedSql, schedule, builder); + return MaterializedViewTask.TASK_TYPE; + } +} diff --git a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java new file mode 100644 index 00000000000..34bb8ea8696 --- /dev/null +++ b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.sql.ddl.compile; + +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; + + +/// Extension point for compiling `CREATE MATERIALIZED VIEW ... AS <query>` statements. +/// +/// The default implementation ([DefaultMaterializedViewDdlHandler]) restricts an MV to a single +/// source table (the single-stage-engine model: a JOIN in the `AS` clause is rejected) and routes +/// the MV task configuration under the built-in `MaterializedViewTask` task type. +/// +/// Downstream distributions can register an alternative handler via +/// [MaterializedViewDdlHandlerRegistry#setHandler] to support richer MV definitions — for example a +/// multi-stage-engine MV whose `AS <query>` is a JOIN, materialized by a different minion task type. +/// Because the controller validates the resulting `TableConfig` (running the task generator's +/// validation) *before* persisting it, a handler that wants to accept a JOIN MUST stamp a task type +/// whose generator can validate that definition — stamping the built-in `MaterializedViewTask` for a +/// JOIN would be rejected by the single-source [org.apache.pinot.materializedview.analysis +/// .MaterializedViewAnalyzer]. The two-method split mirrors the compiler's order: the query is +/// validated up front (before column resolution / schema inference), task routing happens after the +/// schema has been built. +public interface MaterializedViewDdlHandler { + + /// Validates the parsed `AS <query>` clause against the MV's DDL properties. Called before column + /// resolution / schema inference. Implementations should throw [DdlCompilationException] with a + /// user-actionable message when the definition is not supported. + /// + /// @param queryNode the parsed `AS <query>` SqlNode + /// @param properties the MV's `PROPERTIES (...)` map (raw keys/values as typed by the user) + void validateDefinedQuery(SqlNode queryNode, Map<String, String> properties); + + /// Routes the MV's `PROPERTIES (...)` plus the synthetic `definedSql` / `schedule` onto `builder` + /// and returns the minion task type that was stamped. The returned task type is used to validate + /// MV consistency (e.g. that `bucketTimePeriod` is present under it). + /// + /// @param properties the MV's `PROPERTIES (...)` map + /// @param definedSql the verbatim `AS <query>` text + /// @param schedule the Quartz cron derived from `REFRESH EVERY`, or {@code null} when absent + /// @param builder the table-config builder to populate (already has name + isMaterializedView) + /// @return the minion task type stamped onto the builder (e.g. {@code MaterializedViewTask}) + String applyTaskConfig(Map<String, String> properties, String definedSql, @Nullable String schedule, + TableConfigBuilder builder); + + /// Returns true iff `node` or any descendant is a [SqlKind#JOIN] call. Walks the + /// `SqlCall#getOperandList` / `SqlNodeList` axes so the traversal stays wrapper-agnostic + /// (SqlOrderBy, SqlWith, SqlExplain). Leaves (SqlLiteral, SqlIdentifier, ...) terminate naturally. + static boolean containsJoin(@Nullable SqlNode node) { + if (node == null) { + return false; + } + if (node.getKind() == SqlKind.JOIN) { + return true; + } + if (node instanceof SqlCall) { + for (SqlNode child : ((SqlCall) node).getOperandList()) { + if (containsJoin(child)) { + return true; + } + } + } else if (node instanceof SqlNodeList) { + for (SqlNode child : (SqlNodeList) node) { + if (containsJoin(child)) { + return true; + } + } + } + return false; + } +} diff --git a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerRegistry.java b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerRegistry.java new file mode 100644 index 00000000000..580573d32be --- /dev/null +++ b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerRegistry.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.sql.ddl.compile; + +/// Process-wide registry for the active [MaterializedViewDdlHandler]. +/// +/// Defaults to [DefaultMaterializedViewDdlHandler] (single-stage, single-source MVs). A distribution +/// that supports richer MV definitions registers its handler once at controller startup via +/// [#setHandler]. The [DdlCompiler] consults [#getHandler] when compiling +/// `CREATE MATERIALIZED VIEW`. This mirrors the registry pattern used elsewhere for pluggable +/// controller-side behavior (e.g. table-config validators / decorators). +public final class MaterializedViewDdlHandlerRegistry { + private static volatile MaterializedViewDdlHandler _instance = new DefaultMaterializedViewDdlHandler(); + + private MaterializedViewDdlHandlerRegistry() { + } + + /// Registers the handler used for all subsequent `CREATE MATERIALIZED VIEW` compilations. + public static void setHandler(MaterializedViewDdlHandler handler) { + _instance = handler; + } + + /// Returns the active handler (never null; defaults to single-source SSE behavior). + public static MaterializedViewDdlHandler getHandler() { + return _instance; + } +} diff --git a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java index 32f3269ff28..49f1a6739e3 100644 --- a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java +++ b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java @@ -139,6 +139,16 @@ public final class MaterializedViewPropertyRouter { /// * The caller has already validated `properties` keys do not duplicate (case-insensitive). public static void apply(Map<String, String> properties, String definedSql, @Nullable String schedule, TableConfigBuilder builder) { + apply(properties, definedSql, schedule, builder, MaterializedViewTask.TASK_TYPE); + } + + /// Same as [#apply(Map, String, String, TableConfigBuilder)] but stores the routed MV task config + /// under the given `taskType` instead of the built-in `MaterializedViewTask`. Used by alternative + /// [MaterializedViewDdlHandler]s that materialize the MV via a different minion task type (e.g. a + /// multi-stage-engine generator). Bare task-property keys (e.g. `bucketTimePeriod`) route the same + /// way regardless of `taskType`; only the task type the config is stored under changes. + public static void apply(Map<String, String> properties, String definedSql, + @Nullable String schedule, TableConfigBuilder builder, String mvTaskType) { Map<String, Map<String, String>> taskConfigs = new LinkedHashMap<>(); Map<String, String> customConfigs = new LinkedHashMap<>(); Map<String, String> mvTaskConfig = new LinkedHashMap<>(); @@ -244,7 +254,7 @@ public final class MaterializedViewPropertyRouter { mvTaskConfig.put(SCHEDULE_KEY, schedule); } - taskConfigs.put(MaterializedViewTask.TASK_TYPE, mvTaskConfig); + taskConfigs.put(mvTaskType, mvTaskConfig); builder.setTaskConfig(new TableTaskConfig(taskConfigs)); if (!customConfigs.isEmpty()) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
