This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch mse-mv-ddl-extensibility
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 83c9a1fda7e7ffbd2b0762da217c5f3868027fad
Author: Xiang Fu <[email protected]>
AuthorDate: Sat May 30 00:26:11 2026 -0700

    Make CREATE MATERIALIZED VIEW DDL pluggable via MaterializedViewDdlHandler
    
    OSS Pinot's CREATE MATERIALIZED VIEW is single-stage-engine only: a JOIN in
    the AS <query> clause is rejected, and the MV is always routed under the
    MaterializedViewTask task type. Downstream distributions that materialize 
an MV
    via a different engine/task type (e.g. a multi-stage-engine generator that
    supports JOINs) previously had no clean way to override this.
    
    Introduce a MaterializedViewDdlHandler SPI that owns the MV-specific compile
    decisions — query validation (join policy) and task-config routing (which 
task
    type to stamp). DefaultMaterializedViewDdlHandler preserves today's behavior
    exactly (reject JOINs, route under MaterializedViewTask), and is the 
default in
    MaterializedViewDdlHandlerRegistry. 
DdlCompiler.compileCreateMaterializedView
    delegates to the registered handler and validates MV consistency against the
    task type the handler stamped.
    
    Also:
    - MaterializedViewPropertyRouter.apply gains a taskType-parameterized 
overload
      so a handler can route the MV task config under an alternative task type.
    - verifyDefinedSqlIsParseable now does a syntactic parse
      (compileToSqlNodeAndOptions) instead of full single-stage compilation
      (compileToPinotQuery), so the slicing-bug guard accepts multi-stage shapes
      (e.g. JOINs) a handler may permit; engine-specific validity is still 
enforced
      later by the MV analyzer / task generator.
    
    Behavior is byte-for-byte unchanged when no alternative handler is 
registered.
    All existing pinot-sql-ddl tests pass.
    
    Co-Authored-By: Claude Opus 4.8 (1M context) <[email protected]>
---
 .../apache/pinot/sql/ddl/compile/DdlCompiler.java  | 78 +++++++-----------
 .../compile/DefaultMaterializedViewDdlHandler.java | 52 ++++++++++++
 .../ddl/compile/MaterializedViewDdlHandler.java    | 93 ++++++++++++++++++++++
 .../MaterializedViewDdlHandlerRegistry.java        | 43 ++++++++++
 .../compile/MaterializedViewPropertyRouter.java    | 12 ++-
 5 files changed, 227 insertions(+), 51 deletions(-)

diff --git 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java
index 61bd20c45ae..df6b21031a0 100644
--- 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java
+++ 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java
@@ -30,7 +30,6 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlBasicTypeNameSpec;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
@@ -526,9 +525,12 @@ public final class DdlCompiler {
     List<String> warnings = new ArrayList<>();
     Map<String, String> properties = 
resolveProperties(node.getProperties().getList());
 
-    // Reject JOIN early so the inferer's single-source assumption (and the 
analyzer's
-    // downstream check) cannot be violated by a definedSql we haven't 
validated yet.
-    rejectJoinInDefinedSql(node.getQuery());
+    // Validate the AS-clause query via the registered handler before column 
resolution / schema
+    // inference. The default handler rejects JOINs (single-source SSE MVs); 
an alternative handler
+    // (e.g. a multi-stage-engine MV) may accept them. Done early so the 
single-source inferer's
+    // assumption cannot be violated by a definedSql we haven't validated yet.
+    MaterializedViewDdlHandler mvHandler = 
MaterializedViewDdlHandlerRegistry.getHandler();
+    mvHandler.validateDefinedQuery(node.getQuery(), properties);
     String definedSql = extractDefinedSql(originalSql, node.getQuery());
     verifyDefinedSqlIsParseable(definedSql);
 
@@ -539,6 +541,14 @@ public final class DdlCompiler {
     //      surfaces a clear message when it's absent.
     List<ResolvedColumnDefinition> columns;
     if (node.getColumns().getList().isEmpty()) {
+      // Schema inference from the AS <query> projection is 
single-source-only. A handler that
+      // accepted a JOIN above (e.g. multi-stage-engine MV) must require an 
explicit column list,
+      // because the inferer cannot resolve a multi-source projection.
+      if (MaterializedViewDdlHandler.containsJoin(node.getQuery())) {
+        throw new DdlCompilationException(
+            "CREATE MATERIALIZED VIEW with a JOIN in the AS clause requires an 
explicit column "
+                + "list; schema inference is only supported for single-source 
materialized views.");
+      }
       // Fall back to the request-header database when the DDL itself does not 
qualify the
       // MV name — Calcite needs SOME database to resolve `FROM src` against, 
and the
       // header's intent ("operate on database X") matches where the MV will 
be created.
@@ -598,10 +608,12 @@ public final class DdlCompiler {
     TableConfigBuilder builder = new TableConfigBuilder(TableType.OFFLINE)
         .setTableName(tableNameForConfig)
         .setIsMaterializedView(true);
-    MaterializedViewPropertyRouter.apply(properties, definedSql, schedule, 
builder);
+    // The handler routes the MV properties onto the builder and returns the 
minion task type it
+    // stamped (default: MaterializedViewTask). The consistency check below 
uses that task type.
+    String mvTaskType = mvHandler.applyTaskConfig(properties, definedSql, 
schedule, builder);
     TableConfig tableConfig = builder.build();
 
-    validateMaterializedViewConsistency(schema, tableConfig);
+    validateMaterializedViewConsistency(schema, tableConfig, mvTaskType);
 
     return new CompiledCreateMaterializedView(resolved.getDatabaseName(), 
schema, tableConfig,
         node.isIfNotExists(), warnings);
@@ -734,54 +746,19 @@ public final class DdlCompiler {
             + sql.length() + ".");
   }
 
-  /// Walks the parsed AS-clause AST and throws a clear, MV-specific error if 
any JOIN node
-  /// is found at any depth (top-level FROM, subquery FROM, lateral join, 
etc.). See the
-  /// call-site comment in [#compileCreateMaterializedView] for *why* we do 
this against the
-  /// AST rather than letting the downstream re-parse / analyzer surface the 
limitation.
-  private static void rejectJoinInDefinedSql(SqlNode queryNode) {
-    if (containsJoin(queryNode)) {
-      throw new DdlCompilationException(
-          "CREATE MATERIALIZED VIEW does not support JOIN in the AS clause. "
-              + "Materialized views currently read from a single source table; 
"
-              + "pre-join the inputs into a base table and reference that 
single table "
-              + "in the MV definition.");
-    }
-  }
-
-  /// Returns true iff `node` or any descendant is a [SqlKind#JOIN] call. We 
walk via the
-  /// `SqlCall#getOperandList` / `SqlNodeList` axes so the traversal stays 
wrapper-agnostic
-  /// (SqlOrderBy, SqlWith, SqlExplain) — mirroring how [#collectPositions] 
walks the tree.
-  /// Leaves (SqlLiteral, SqlIdentifier, ...) terminate the recursion 
naturally.
-  private static boolean containsJoin(@Nullable SqlNode node) {
-    if (node == null) {
-      return false;
-    }
-    if (node.getKind() == SqlKind.JOIN) {
-      return true;
-    }
-    if (node instanceof SqlCall) {
-      for (SqlNode child : ((SqlCall) node).getOperandList()) {
-        if (containsJoin(child)) {
-          return true;
-        }
-      }
-    } else if (node instanceof SqlNodeList) {
-      for (SqlNode child : (SqlNodeList) node) {
-        if (containsJoin(child)) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
   /// Sanity check: the substring we extracted must be a standalone parseable 
Pinot query.
   /// This guards against DDL-layer slicing bugs (off-by-one in 
[#lineColToOffset], parser
   /// position quirks) so the error surfaces in the DDL layer rather than at 
first scheduler
   /// tick or at create-time analysis (PR 3).
+  ///
+  /// Uses a syntactic parse (`compileToSqlNodeAndOptions`) rather than full 
single-stage
+  /// compilation (`compileToPinotQuery`): the slicing-bug guard only needs to 
confirm the extracted
+  /// text is a well-formed query, and the syntactic parse accepts multi-stage 
shapes (e.g. JOINs)
+  /// that a [MaterializedViewDdlHandler] may permit. Engine-specific validity 
is enforced later by
+  /// the MV analyzer / task generator for whichever task type the handler 
stamped.
   private static void verifyDefinedSqlIsParseable(String definedSql) {
     try {
-      CalciteSqlParser.compileToPinotQuery(definedSql);
+      CalciteSqlParser.compileToSqlNodeAndOptions(definedSql);
     } catch (Exception e) {
       throw new DdlCompilationException(
           "AS <query> did not re-parse as a Pinot query: " + e.getMessage()
@@ -791,7 +768,8 @@ public final class DdlCompiler {
 
   /// Cross-checks: `timeColumnName` must reference a declared DATETIME 
column, and
   /// `bucketTimePeriod` must be present so the scheduler has a window size.
-  private static void validateMaterializedViewConsistency(Schema schema, 
TableConfig tableConfig) {
+  private static void validateMaterializedViewConsistency(Schema schema, 
TableConfig tableConfig,
+      String mvTaskType) {
     String timeColumnName = 
tableConfig.getValidationConfig().getTimeColumnName();
     if (timeColumnName == null || timeColumnName.isEmpty()) {
       throw new DdlCompilationException(
@@ -809,7 +787,7 @@ public final class DdlCompiler {
     }
     Map<String, String> mvTaskConfig = tableConfig.getTaskConfig() == null
         ? null
-        : 
tableConfig.getTaskConfig().getConfigsForTaskType(MaterializedViewTask.TASK_TYPE);
+        : tableConfig.getTaskConfig().getConfigsForTaskType(mvTaskType);
     if (mvTaskConfig == null
         || 
!mvTaskConfig.containsKey(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY)) {
       throw new DdlCompilationException(
diff --git 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java
 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java
new file mode 100644
index 00000000000..7b49d7c0832
--- /dev/null
+++ 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.ddl.compile;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+
+
+/// Default, single-stage-engine (SSE) materialized-view DDL handler.
+///
+/// A materialized view reads from exactly one source table: a JOIN in the `AS 
<query>` clause is
+/// rejected, and the MV task configuration is routed under the built-in 
`MaterializedViewTask` task
+/// type. This is the behavior of OSS Pinot when no alternative handler is 
registered.
+public class DefaultMaterializedViewDdlHandler implements 
MaterializedViewDdlHandler {
+
+  @Override
+  public void validateDefinedQuery(SqlNode queryNode, Map<String, String> 
properties) {
+    if (MaterializedViewDdlHandler.containsJoin(queryNode)) {
+      throw new DdlCompilationException(
+          "CREATE MATERIALIZED VIEW does not support JOIN in the AS clause. "
+              + "Materialized views currently read from a single source table; 
"
+              + "pre-join the inputs into a base table and reference that 
single table "
+              + "in the MV definition.");
+    }
+  }
+
+  @Override
+  public String applyTaskConfig(Map<String, String> properties, String 
definedSql,
+      @Nullable String schedule, TableConfigBuilder builder) {
+    MaterializedViewPropertyRouter.apply(properties, definedSql, schedule, 
builder);
+    return MaterializedViewTask.TASK_TYPE;
+  }
+}
diff --git 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java
 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java
new file mode 100644
index 00000000000..34bb8ea8696
--- /dev/null
+++ 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.ddl.compile;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+
+
+/// Extension point for compiling `CREATE MATERIALIZED VIEW ... AS <query>` 
statements.
+///
+/// The default implementation ([DefaultMaterializedViewDdlHandler]) restricts 
an MV to a single
+/// source table (the single-stage-engine model: a JOIN in the `AS` clause is 
rejected) and routes
+/// the MV task configuration under the built-in `MaterializedViewTask` task 
type.
+///
+/// Downstream distributions can register an alternative handler via
+/// [MaterializedViewDdlHandlerRegistry#setHandler] to support richer MV 
definitions — for example a
+/// multi-stage-engine MV whose `AS <query>` is a JOIN, materialized by a 
different minion task type.
+/// Because the controller validates the resulting `TableConfig` (running the 
task generator's
+/// validation) *before* persisting it, a handler that wants to accept a JOIN 
MUST stamp a task type
+/// whose generator can validate that definition — stamping the built-in 
`MaterializedViewTask` for a
+/// JOIN would be rejected by the single-source 
[org.apache.pinot.materializedview.analysis
+/// .MaterializedViewAnalyzer]. The two-method split mirrors the compiler's 
order: the query is
+/// validated up front (before column resolution / schema inference), task 
routing happens after the
+/// schema has been built.
+public interface MaterializedViewDdlHandler {
+
+  /// Validates the parsed `AS <query>` clause against the MV's DDL 
properties. Called before column
+  /// resolution / schema inference. Implementations should throw 
[DdlCompilationException] with a
+  /// user-actionable message when the definition is not supported.
+  ///
+  /// @param queryNode  the parsed `AS <query>` SqlNode
+  /// @param properties the MV's `PROPERTIES (...)` map (raw keys/values as 
typed by the user)
+  void validateDefinedQuery(SqlNode queryNode, Map<String, String> properties);
+
+  /// Routes the MV's `PROPERTIES (...)` plus the synthetic `definedSql` / 
`schedule` onto `builder`
+  /// and returns the minion task type that was stamped. The returned task 
type is used to validate
+  /// MV consistency (e.g. that `bucketTimePeriod` is present under it).
+  ///
+  /// @param properties the MV's `PROPERTIES (...)` map
+  /// @param definedSql the verbatim `AS <query>` text
+  /// @param schedule   the Quartz cron derived from `REFRESH EVERY`, or 
{@code null} when absent
+  /// @param builder    the table-config builder to populate (already has name 
+ isMaterializedView)
+  /// @return the minion task type stamped onto the builder (e.g. {@code 
MaterializedViewTask})
+  String applyTaskConfig(Map<String, String> properties, String definedSql, 
@Nullable String schedule,
+      TableConfigBuilder builder);
+
+  /// Returns true iff `node` or any descendant is a [SqlKind#JOIN] call. 
Walks the
+  /// `SqlCall#getOperandList` / `SqlNodeList` axes so the traversal stays 
wrapper-agnostic
+  /// (SqlOrderBy, SqlWith, SqlExplain). Leaves (SqlLiteral, SqlIdentifier, 
...) terminate naturally.
+  static boolean containsJoin(@Nullable SqlNode node) {
+    if (node == null) {
+      return false;
+    }
+    if (node.getKind() == SqlKind.JOIN) {
+      return true;
+    }
+    if (node instanceof SqlCall) {
+      for (SqlNode child : ((SqlCall) node).getOperandList()) {
+        if (containsJoin(child)) {
+          return true;
+        }
+      }
+    } else if (node instanceof SqlNodeList) {
+      for (SqlNode child : (SqlNodeList) node) {
+        if (containsJoin(child)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+}
diff --git 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerRegistry.java
 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerRegistry.java
new file mode 100644
index 00000000000..580573d32be
--- /dev/null
+++ 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerRegistry.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.ddl.compile;
+
+/// Process-wide registry for the active [MaterializedViewDdlHandler].
+///
+/// Defaults to [DefaultMaterializedViewDdlHandler] (single-stage, 
single-source MVs). A distribution
+/// that supports richer MV definitions registers its handler once at 
controller startup via
+/// [#setHandler]. The [DdlCompiler] consults [#getHandler] when compiling
+/// `CREATE MATERIALIZED VIEW`. This mirrors the registry pattern used 
elsewhere for pluggable
+/// controller-side behavior (e.g. table-config validators / decorators).
+public final class MaterializedViewDdlHandlerRegistry {
+  private static volatile MaterializedViewDdlHandler _instance = new 
DefaultMaterializedViewDdlHandler();
+
+  private MaterializedViewDdlHandlerRegistry() {
+  }
+
+  /// Registers the handler used for all subsequent `CREATE MATERIALIZED VIEW` 
compilations.
+  public static void setHandler(MaterializedViewDdlHandler handler) {
+    _instance = handler;
+  }
+
+  /// Returns the active handler (never null; defaults to single-source SSE 
behavior).
+  public static MaterializedViewDdlHandler getHandler() {
+    return _instance;
+  }
+}
diff --git 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java
 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java
index 32f3269ff28..49f1a6739e3 100644
--- 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java
+++ 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java
@@ -139,6 +139,16 @@ public final class MaterializedViewPropertyRouter {
   ///   * The caller has already validated `properties` keys do not duplicate 
(case-insensitive).
   public static void apply(Map<String, String> properties, String definedSql,
       @Nullable String schedule, TableConfigBuilder builder) {
+    apply(properties, definedSql, schedule, builder, 
MaterializedViewTask.TASK_TYPE);
+  }
+
+  /// Same as [#apply(Map, String, String, TableConfigBuilder)] but stores the 
routed MV task config
+  /// under the given `taskType` instead of the built-in 
`MaterializedViewTask`. Used by alternative
+  /// [MaterializedViewDdlHandler]s that materialize the MV via a different 
minion task type (e.g. a
+  /// multi-stage-engine generator). Bare task-property keys (e.g. 
`bucketTimePeriod`) route the same
+  /// way regardless of `taskType`; only the task type the config is stored 
under changes.
+  public static void apply(Map<String, String> properties, String definedSql,
+      @Nullable String schedule, TableConfigBuilder builder, String 
mvTaskType) {
     Map<String, Map<String, String>> taskConfigs = new LinkedHashMap<>();
     Map<String, String> customConfigs = new LinkedHashMap<>();
     Map<String, String> mvTaskConfig = new LinkedHashMap<>();
@@ -244,7 +254,7 @@ public final class MaterializedViewPropertyRouter {
       mvTaskConfig.put(SCHEDULE_KEY, schedule);
     }
 
-    taskConfigs.put(MaterializedViewTask.TASK_TYPE, mvTaskConfig);
+    taskConfigs.put(mvTaskType, mvTaskConfig);
     builder.setTaskConfig(new TableTaskConfig(taskConfigs));
 
     if (!customConfigs.isEmpty()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to