This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new edfbf6943a5 Make CREATE MATERIALIZED VIEW DDL pluggable via 
MaterializedViewDdlHandler (#18639)
edfbf6943a5 is described below

commit edfbf6943a58732c7d51c74b88f495c52c25afd5
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Jun 1 12:48:33 2026 -0700

    Make CREATE MATERIALIZED VIEW DDL pluggable via MaterializedViewDdlHandler 
(#18639)
    
    Adds a MaterializedViewDdlHandler extension point so CREATE MATERIALIZED 
VIEW
    can target an alternative engine / minion task type. The default handler 
targets
    the single-stage engine (re-compiles the AS-clause as a single-stage Pinot 
query,
    rejecting JOIN / multi-source, routes under MaterializedViewTask); a 
downstream
    distribution can register an MSE handler that accepts joins and stamps its 
own
    task type. The handler — installed via 
DdlCompiler.setMaterializedViewDdlHandler —
    owns engine-specific verification (validateDefinedQuery) and whether 
projection
    schema inference is allowed (supportsSchemaInference). DdlCompiler fails 
fast with
    a clear message if a handler returns a null/unstamped task type.
    
    MaterializedViewPropertyRouter.apply gains a taskType-parameterized 
overload, and
    the "MV's own task type" matching is generalized from the hard-coded
    MaterializedViewTask to that task type so task.<taskType>.* knobs route 
correctly
    (and are not dropped) for custom task types.
    
    Documents the extension-point contract: a handler stamping a non-built-in 
task type
    owns that type's complete runtime including definition-metadata persistence 
and
    consistency tracking. The built-in controller-side MV machinery
    (MaterializedViewDefinitionMetadata persistence + 
MaterializedViewConsistencyManager)
    keys on MaterializedViewTask and now intentionally and explicitly skips MVs 
stamped
    with a different task type (logged at INFO, not WARN), leaving 
freshness/consistency
    to the owning task type.
    
    Behavior is unchanged when no alternative handler is registered. All 
pinot-sql-ddl
    tests pass.
    
    Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
 .../helix/core/PinotHelixResourceManager.java      |  26 ++-
 .../apache/pinot/sql/ddl/compile/DdlCompiler.java  | 125 +++++++-------
 .../compile/DefaultMaterializedViewDdlHandler.java |  63 +++++++
 .../ddl/compile/MaterializedViewDdlHandler.java    | 118 +++++++++++++
 .../compile/MaterializedViewPropertyRouter.java    |  22 ++-
 .../compile/MaterializedViewDdlHandlerTest.java    | 189 +++++++++++++++++++++
 6 files changed, 469 insertions(+), 74 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index f847aa96ad4..ed1439afbed 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -5473,8 +5473,18 @@ public class PinotHelixResourceManager {
     try {
       Map<String, String> taskConfigs = 
tableConfig.getMaterializedViewTaskConfigs();
       if (taskConfigs == null) {
-        LOGGER.warn("MV table {} has no MaterializedViewTask config; skipping 
definition metadata persist",
-            tableNameWithType);
+        // This MV is not materialized by the built-in MaterializedViewTask. A 
downstream
+        // MaterializedViewDdlHandler / task type may materialize it via its 
own minion runtime; by
+        // contract (see MaterializedViewDdlHandler) that runtime also owns 
its definition metadata
+        // and consistency tracking, so the built-in machinery here 
intentionally does not manage it.
+        if (tableConfig.hasMaterializedViewTaskWithDefinedSql()) {
+          LOGGER.info("MV table {} uses a non-built-in MV task type; its 
definition metadata is owned "
+              + "by that task type's runtime — skipping built-in 
MaterializedViewTask metadata persist",
+              tableNameWithType);
+        } else {
+          LOGGER.warn("MV table {} has no MaterializedViewTask config; 
skipping definition metadata persist",
+              tableNameWithType);
+        }
         return;
       }
       String definedSql = 
taskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
@@ -5705,8 +5715,16 @@ public class PinotHelixResourceManager {
       }
       Map<String, String> materializedViewTaskConfigs = 
tableConfig.getMaterializedViewTaskConfigs();
       if (materializedViewTaskConfigs == null) {
-        LOGGER.warn("MV table {} has no MaterializedViewTask config for 
consistency registration",
-            tableConfig.getTableName());
+        // Not a built-in MaterializedViewTask MV: a downstream task type 
materializes it and, by
+        // contract (see MaterializedViewDdlHandler), owns its own consistency 
tracking. The built-in
+        // MaterializedViewConsistencyManager intentionally does not register 
it here.
+        if (tableConfig.hasMaterializedViewTaskWithDefinedSql()) {
+          LOGGER.info("MV table {} uses a non-built-in MV task type; 
consistency is owned by that task "
+              + "type's runtime — skipping built-in consistency registration", 
tableConfig.getTableName());
+        } else {
+          LOGGER.warn("MV table {} has no MaterializedViewTask config for 
consistency registration",
+              tableConfig.getTableName());
+        }
         return;
       }
       String definedSQL = 
materializedViewTaskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
diff --git 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java
index 61bd20c45ae..0a4b791fc5b 100644
--- 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java
+++ 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java
@@ -30,7 +30,6 @@ import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.sql.SqlBasicTypeNameSpec;
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
@@ -88,9 +87,29 @@ import 
org.apache.pinot.sql.parsers.parser.SqlPinotShowTables;
 /// Stateless and thread-safe. All entry points are static.
 public final class DdlCompiler {
 
+  /// Pluggable handler for `CREATE MATERIALIZED VIEW` compilation (query 
validation + task-config
+  /// routing). Defaults to single-source / single-stage behavior. A 
distribution that supports
+  /// richer MV definitions (e.g. a multi-stage-engine MV whose `AS` clause is 
a JOIN) installs its
+  /// own handler via [#setMaterializedViewDdlHandler] once at controller 
startup, before any DDL is
+  /// served. Volatile because it is read on the DDL request path and may be 
set from a different
+  /// (startup) thread; not intended to be swapped while DDL is in flight.
+  private static volatile MaterializedViewDdlHandler 
_materializedViewDdlHandler =
+      new DefaultMaterializedViewDdlHandler();
+
   private DdlCompiler() {
   }
 
+  /// Installs the [MaterializedViewDdlHandler] used for all subsequent 
`CREATE MATERIALIZED VIEW`
+  /// compilations. Call once at controller startup; defaults to 
[DefaultMaterializedViewDdlHandler].
+  public static void setMaterializedViewDdlHandler(MaterializedViewDdlHandler 
handler) {
+    _materializedViewDdlHandler = handler;
+  }
+
+  /// Returns the active materialized-view DDL handler (never null; defaults 
to single-source SSE).
+  public static MaterializedViewDdlHandler getMaterializedViewDdlHandler() {
+    return _materializedViewDdlHandler;
+  }
+
   /// Parses and compiles a DDL statement using a stateless {@link 
DdlCompileContext}.
   ///
   /// @deprecated Use {@link #compile(String, DdlCompileContext)} and supply a 
real context.
@@ -526,11 +545,14 @@ public final class DdlCompiler {
     List<String> warnings = new ArrayList<>();
     Map<String, String> properties = 
resolveProperties(node.getProperties().getList());
 
-    // Reject JOIN early so the inferer's single-source assumption (and the 
analyzer's
-    // downstream check) cannot be violated by a definedSql we haven't 
validated yet.
-    rejectJoinInDefinedSql(node.getQuery());
+    // The engine (SSE vs MSE) is the registered handler's choice — not the 
compiler's. Extract the
+    // verbatim AS-clause text, then let the handler validate it for its 
target engine (the default
+    // handler re-compiles it as a single-stage Pinot query; an MSE handler 
does a multi-stage check).
+    // Done before column resolution / schema inference so the single-source 
inferer cannot be fed a
+    // definition the handler hasn't accepted.
+    MaterializedViewDdlHandler mvHandler = getMaterializedViewDdlHandler();
     String definedSql = extractDefinedSql(originalSql, node.getQuery());
-    verifyDefinedSqlIsParseable(definedSql);
+    mvHandler.validateDefinedQuery(node.getQuery(), definedSql, properties);
 
     // Two paths:
     //   1) Explicit column list — legacy path, will be deprecated once the 
inferer matures.
@@ -539,6 +561,15 @@ public final class DdlCompiler {
     //      surfaces a clear message when it's absent.
     List<ResolvedColumnDefinition> columns;
     if (node.getColumns().getList().isEmpty()) {
+      // Schema inference from the AS <query> projection is 
single-source-only. A handler whose
+      // definedSQL may be multi-source (e.g. a multi-stage-engine MV) reports 
that it does not
+      // support inference, so an explicit column list is required.
+      if (!mvHandler.supportsSchemaInference(properties)) {
+        throw new DdlCompilationException(
+            "CREATE MATERIALIZED VIEW requires an explicit column list for 
this materialized view; "
+                + "schema inference from the AS <query> projection is only 
supported for "
+                + "single-source materialized views.");
+      }
       // Fall back to the request-header database when the DDL itself does not 
qualify the
       // MV name — Calcite needs SOME database to resolve `FROM src` against, 
and the
       // header's intent ("operate on database X") matches where the MV will 
be created.
@@ -598,10 +629,17 @@ public final class DdlCompiler {
     TableConfigBuilder builder = new TableConfigBuilder(TableType.OFFLINE)
         .setTableName(tableNameForConfig)
         .setIsMaterializedView(true);
-    MaterializedViewPropertyRouter.apply(properties, definedSql, schedule, 
builder);
+    // The handler routes the MV properties onto the builder and returns the 
minion task type it
+    // stamped (default: MaterializedViewTask). The consistency check below 
uses that task type.
+    String mvTaskType = mvHandler.applyTaskConfig(properties, definedSql, 
schedule, builder);
+    if (mvTaskType == null) {
+      throw new DdlCompilationException("MaterializedViewDdlHandler "
+          + mvHandler.getClass().getName() + " returned a null task type from 
applyTaskConfig; it "
+          + "must return the task type it stamped onto the table config.");
+    }
     TableConfig tableConfig = builder.build();
 
-    validateMaterializedViewConsistency(schema, tableConfig);
+    validateMaterializedViewConsistency(schema, tableConfig, mvTaskType);
 
     return new CompiledCreateMaterializedView(resolved.getDatabaseName(), 
schema, tableConfig,
         node.isIfNotExists(), warnings);
@@ -734,64 +772,10 @@ public final class DdlCompiler {
             + sql.length() + ".");
   }
 
-  /// Walks the parsed AS-clause AST and throws a clear, MV-specific error if 
any JOIN node
-  /// is found at any depth (top-level FROM, subquery FROM, lateral join, 
etc.). See the
-  /// call-site comment in [#compileCreateMaterializedView] for *why* we do 
this against the
-  /// AST rather than letting the downstream re-parse / analyzer surface the 
limitation.
-  private static void rejectJoinInDefinedSql(SqlNode queryNode) {
-    if (containsJoin(queryNode)) {
-      throw new DdlCompilationException(
-          "CREATE MATERIALIZED VIEW does not support JOIN in the AS clause. "
-              + "Materialized views currently read from a single source table; 
"
-              + "pre-join the inputs into a base table and reference that 
single table "
-              + "in the MV definition.");
-    }
-  }
-
-  /// Returns true iff `node` or any descendant is a [SqlKind#JOIN] call. We 
walk via the
-  /// `SqlCall#getOperandList` / `SqlNodeList` axes so the traversal stays 
wrapper-agnostic
-  /// (SqlOrderBy, SqlWith, SqlExplain) — mirroring how [#collectPositions] 
walks the tree.
-  /// Leaves (SqlLiteral, SqlIdentifier, ...) terminate the recursion 
naturally.
-  private static boolean containsJoin(@Nullable SqlNode node) {
-    if (node == null) {
-      return false;
-    }
-    if (node.getKind() == SqlKind.JOIN) {
-      return true;
-    }
-    if (node instanceof SqlCall) {
-      for (SqlNode child : ((SqlCall) node).getOperandList()) {
-        if (containsJoin(child)) {
-          return true;
-        }
-      }
-    } else if (node instanceof SqlNodeList) {
-      for (SqlNode child : (SqlNodeList) node) {
-        if (containsJoin(child)) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  /// Sanity check: the substring we extracted must be a standalone parseable 
Pinot query.
-  /// This guards against DDL-layer slicing bugs (off-by-one in 
[#lineColToOffset], parser
-  /// position quirks) so the error surfaces in the DDL layer rather than at 
first scheduler
-  /// tick or at create-time analysis (PR 3).
-  private static void verifyDefinedSqlIsParseable(String definedSql) {
-    try {
-      CalciteSqlParser.compileToPinotQuery(definedSql);
-    } catch (Exception e) {
-      throw new DdlCompilationException(
-          "AS <query> did not re-parse as a Pinot query: " + e.getMessage()
-              + " (extracted text: " + definedSql + ")", e);
-    }
-  }
-
   /// Cross-checks: `timeColumnName` must reference a declared DATETIME 
column, and
   /// `bucketTimePeriod` must be present so the scheduler has a window size.
-  private static void validateMaterializedViewConsistency(Schema schema, 
TableConfig tableConfig) {
+  private static void validateMaterializedViewConsistency(Schema schema, 
TableConfig tableConfig,
+      String mvTaskType) {
     String timeColumnName = 
tableConfig.getValidationConfig().getTimeColumnName();
     if (timeColumnName == null || timeColumnName.isEmpty()) {
       throw new DdlCompilationException(
@@ -809,9 +793,18 @@ public final class DdlCompiler {
     }
     Map<String, String> mvTaskConfig = tableConfig.getTaskConfig() == null
         ? null
-        : 
tableConfig.getTaskConfig().getConfigsForTaskType(MaterializedViewTask.TASK_TYPE);
-    if (mvTaskConfig == null
-        || 
!mvTaskConfig.containsKey(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY)) {
+        : tableConfig.getTaskConfig().getConfigsForTaskType(mvTaskType);
+    if (mvTaskConfig == null) {
+      // The handler's applyTaskConfig returned this task type but did not 
stamp a matching task
+      // config onto the builder — a handler-contract violation, not a user 
error. Surface it as
+      // such so a custom MaterializedViewDdlHandler author gets an actionable 
diagnostic rather
+      // than the misleading "bucketTimePeriod missing" message below.
+      throw new DdlCompilationException(
+          "MaterializedViewDdlHandler returned task type '" + mvTaskType + "' 
from applyTaskConfig "
+              + "but did not stamp a task config under it; the returned task 
type must match the "
+              + "task config written to the table.");
+    }
+    if 
(!mvTaskConfig.containsKey(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY)) {
       throw new DdlCompilationException(
           "CREATE MATERIALIZED VIEW requires a 'bucketTimePeriod' property 
(e.g. '1d', '1h'); "
               + "it defines the time window each refresh tick materializes.");
diff --git 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java
 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java
new file mode 100644
index 00000000000..5d2e3c55b09
--- /dev/null
+++ 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.ddl.compile;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+
+
+/// Default, single-stage-engine (SSE) materialized-view DDL handler.
+///
+/// A materialized view reads from exactly one source table: a JOIN in the `AS 
<query>` clause is
+/// rejected, and the MV task configuration is routed under the built-in 
`MaterializedViewTask` task
+/// type. This is the behavior of OSS Pinot when no alternative handler is 
registered.
+public class DefaultMaterializedViewDdlHandler implements 
MaterializedViewDdlHandler {
+
+  @Override
+  public void validateDefinedQuery(SqlNode queryNode, String definedSql, 
Map<String, String> properties) {
+    if (MaterializedViewDdlHandler.containsJoin(queryNode)) {
+      throw new DdlCompilationException(
+          "CREATE MATERIALIZED VIEW does not support JOIN in the AS clause. "
+              + "Materialized views currently read from a single source table; 
"
+              + "pre-join the inputs into a base table and reference that 
single table "
+              + "in the MV definition.");
+    }
+    /// Re-compile the extracted definedSQL as a single-stage Pinot query. 
This both guards against
+    /// DDL-layer slicing bugs (off-by-one in the parser-position extraction) 
and enforces SSE
+    /// compatibility, so the error surfaces here rather than at the first 
scheduler tick / analysis.
+    try {
+      CalciteSqlParser.compileToPinotQuery(definedSql);
+    } catch (Exception e) {
+      throw new DdlCompilationException(
+          "AS <query> did not re-parse as a single-stage Pinot query: " + 
e.getMessage()
+              + " (extracted text: " + definedSql + ")", e);
+    }
+  }
+
+  @Override
+  public String applyTaskConfig(Map<String, String> properties, String 
definedSql,
+      @Nullable String schedule, TableConfigBuilder builder) {
+    MaterializedViewPropertyRouter.apply(properties, definedSql, schedule, 
builder);
+    return MaterializedViewTask.TASK_TYPE;
+  }
+}
diff --git 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java
 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java
new file mode 100644
index 00000000000..303586501ee
--- /dev/null
+++ 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.ddl.compile;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+
+
+/// Extension point for compiling `CREATE MATERIALIZED VIEW ... AS <query>` 
statements.
+///
+/// The `definedSQL` of a materialized view can be executed by either query 
engine; the handler — not
+/// the compiler — decides and validates accordingly. The default 
implementation
+/// ([DefaultMaterializedViewDdlHandler]) targets the **single-stage engine 
(SSE)**: it verifies the
+/// `definedSQL` compiles as a single-stage Pinot query (which restricts it to 
a single source table)
+/// and routes the MV task configuration under the built-in 
`MaterializedViewTask` task type.
+///
+/// Downstream distributions can register an alternative handler via
+/// [DdlCompiler#setMaterializedViewDdlHandler] to target the **multi-stage 
engine (MSE)** — accepting
+/// multi-source / JOIN definitions and routing them under a different minion 
task type. Because the
+/// controller validates the resulting `TableConfig` (running the task 
generator's validation)
+/// *before* persisting it, an MSE handler MUST stamp a task type whose 
generator can validate that
+/// definition — stamping the built-in `MaterializedViewTask` for a JOIN would 
be rejected by the
+/// single-source 
[org.apache.pinot.materializedview.analysis.MaterializedViewAnalyzer].
+///
+/// **Contract for non-built-in task types.** A handler that stamps a task 
type other than the
+/// built-in `MaterializedViewTask` is responsible for the *complete* runtime 
of that task type —
+/// not only the minion task generator / executor that materializes the view, 
but also its
+/// definition-metadata persistence and consistency tracking. OSS's built-in 
controller-side MV
+/// machinery (`MaterializedViewDefinitionMetadata` persistence and the
+/// `MaterializedViewConsistencyManager`) keys on `MaterializedViewTask` and 
therefore manages only
+/// built-in MVs; it intentionally skips MVs stamped with a different task 
type (the
+/// `isMaterializedView=true` flag and DDL lifecycle still apply uniformly). 
This keeps the built-in
+/// single-source freshness model from being applied to definitions it cannot 
reason about (e.g. a
+/// multi-source JOIN), and leaves freshness/consistency to the task type that 
owns the MV.
+public interface MaterializedViewDdlHandler {
+
+  /// Validates the MV's `AS <query>` clause for the target engine. Called 
before column resolution /
+  /// schema inference. The handler is responsible for whatever 
engine-specific checks apply — the
+  /// default (SSE) handler re-compiles `definedSql` as a single-stage Pinot 
query (rejecting
+  /// multi-source / JOIN queries); an MSE handler performs a 
multi-stage-compatible check. Throw
+  /// [DdlCompilationException] with a user-actionable message when the 
definition is not supported.
+  ///
+  /// @param queryNode  the parsed `AS <query>` SqlNode (use [#containsJoin] 
for AST-level checks)
+  /// @param definedSql the extracted verbatim `AS <query>` text (re-parse it 
to guard against
+  ///                   DDL-layer slicing bugs and to enforce engine 
compatibility)
+  /// @param properties the MV's `PROPERTIES (...)` map (raw keys/values as 
typed by the user)
+  void validateDefinedQuery(SqlNode queryNode, String definedSql, Map<String, 
String> properties);
+
+  /// Whether the MV schema may be inferred from the `AS <query>` projection 
when the DDL omits an
+  /// explicit column list. Defaults to {@code true} (single-source projection 
inference). A handler
+  /// whose `definedSQL` may be multi-source — where projection inference is 
not supported — returns
+  /// {@code false}, so the compiler requires an explicit column list.
+  ///
+  /// @param properties the MV's `PROPERTIES (...)` map
+  default boolean supportsSchemaInference(Map<String, String> properties) {
+    return true;
+  }
+
+  /// Routes the MV's `PROPERTIES (...)` plus the synthetic `definedSql` / 
`schedule` onto `builder`
+  /// and returns the minion task type that was stamped (must be non-null and 
equal to the task-type
+  /// key written to `builder`). The returned task type is used to validate MV 
consistency (e.g. that
+  /// `bucketTimePeriod` is present under it).
+  ///
+  /// @param properties the MV's `PROPERTIES (...)` map
+  /// @param definedSql the verbatim `AS <query>` text
+  /// @param schedule   the Quartz cron derived from `REFRESH EVERY`, or 
{@code null} when absent
+  /// @param builder    the table-config builder to populate (already has name 
+ isMaterializedView)
+  /// @return the minion task type stamped onto the builder (e.g. {@code 
MaterializedViewTask})
+  String applyTaskConfig(Map<String, String> properties, String definedSql, 
@Nullable String schedule,
+      TableConfigBuilder builder);
+
+  /// Returns true iff `node` or any descendant is a [SqlKind#JOIN] call. 
Walks the
+  /// `SqlCall#getOperandList` / `SqlNodeList` axes so the traversal stays 
wrapper-agnostic
+  /// (SqlOrderBy, SqlWith, SqlExplain). Leaves (SqlLiteral, SqlIdentifier, 
...) terminate naturally.
+  static boolean containsJoin(@Nullable SqlNode node) {
+    if (node == null) {
+      return false;
+    }
+    if (node.getKind() == SqlKind.JOIN) {
+      return true;
+    }
+    if (node instanceof SqlCall) {
+      for (SqlNode child : ((SqlCall) node).getOperandList()) {
+        if (containsJoin(child)) {
+          return true;
+        }
+      }
+    } else if (node instanceof SqlNodeList) {
+      for (SqlNode child : (SqlNodeList) node) {
+        if (containsJoin(child)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+}
diff --git 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java
 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java
index 32f3269ff28..9c2eaf0e560 100644
--- 
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java
+++ 
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java
@@ -139,6 +139,16 @@ public final class MaterializedViewPropertyRouter {
   ///   * The caller has already validated `properties` keys do not duplicate 
(case-insensitive).
   public static void apply(Map<String, String> properties, String definedSql,
       @Nullable String schedule, TableConfigBuilder builder) {
+    apply(properties, definedSql, schedule, builder, 
MaterializedViewTask.TASK_TYPE);
+  }
+
+  /// Same as [#apply(Map, String, String, TableConfigBuilder)] but stores the 
routed MV task config
+  /// under the given `taskType` instead of the built-in 
`MaterializedViewTask`. Used by alternative
+  /// [MaterializedViewDdlHandler]s that materialize the MV via a different 
minion task type (e.g. a
+  /// multi-stage-engine generator). Bare task-property keys (e.g. 
`bucketTimePeriod`) route the same
+  /// way regardless of `taskType`; only the task type the config is stored 
under changes.
+  public static void apply(Map<String, String> properties, String definedSql,
+      @Nullable String schedule, TableConfigBuilder builder, String 
mvTaskType) {
     Map<String, Map<String, String>> taskConfigs = new LinkedHashMap<>();
     Map<String, String> customConfigs = new LinkedHashMap<>();
     Map<String, String> mvTaskConfig = new LinkedHashMap<>();
@@ -210,7 +220,11 @@ public final class MaterializedViewPropertyRouter {
         }
         String taskType = afterPrefix.substring(0, dot);
         String taskKey = afterPrefix.substring(dot + 1);
-        if (MaterializedViewTask.TASK_TYPE.equals(taskType)
+        // Compare against the MV's own task type (mvTaskType), not the 
hard-coded built-in, so a
+        // custom handler's task type is recognized as "this MV's task config" 
rather than treated
+        // as an unrelated composed task type (which would be dropped by the 
put(mvTaskType, ...)
+        // below).
+        if (mvTaskType.equals(taskType)
             && (SCHEDULE_KEY.equals(taskKey.toLowerCase(Locale.ROOT))
                 || 
MaterializedViewTask.DEFINED_SQL_KEY.equalsIgnoreCase(taskKey))) {
           throw new DdlCompilationException(
@@ -218,9 +232,9 @@ public final class MaterializedViewPropertyRouter {
                   + "(REFRESH EVERY for 'schedule', AS <query> for 
'definedSQL'); "
                   + "remove it from PROPERTIES.");
         }
-        if (MaterializedViewTask.TASK_TYPE.equals(taskType)) {
+        if (mvTaskType.equals(taskType)) {
           // Canonicalize the knob casing the same way the bare-form branch 
does, so
-          // `task.MaterializedViewTask.BUCKETTIMEPERIOD` and bare 
`BUCKETTIMEPERIOD` end up
+          // `task.<mvTaskType>.BUCKETTIMEPERIOD` and bare `BUCKETTIMEPERIOD` 
end up
           // under the same on-wire key (the constant casing in 
CommonConstants).
           String canonical = 
TASK_CONFIG_KEYS.getOrDefault(taskKey.toLowerCase(Locale.ROOT), taskKey);
           mvTaskConfig.put(canonical, value);
@@ -244,7 +258,7 @@ public final class MaterializedViewPropertyRouter {
       mvTaskConfig.put(SCHEDULE_KEY, schedule);
     }
 
-    taskConfigs.put(MaterializedViewTask.TASK_TYPE, mvTaskConfig);
+    taskConfigs.put(mvTaskType, mvTaskConfig);
     builder.setTaskConfig(new TableTaskConfig(taskConfigs));
 
     if (!customConfigs.isEmpty()) {
diff --git 
a/pinot-sql-ddl/src/test/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerTest.java
 
b/pinot-sql-ddl/src/test/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerTest.java
new file mode 100644
index 00000000000..300e08f48c5
--- /dev/null
+++ 
b/pinot-sql-ddl/src/test/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.ddl.compile;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+/// Tests the [MaterializedViewDdlHandler] extension point: a registered 
handler can accept a JOIN in
+/// the `AS` clause and route the MV task config under an alternative task 
type, while the default
+/// handler preserves single-source / single-stage behavior.
+public class MaterializedViewDdlHandlerTest {
+  private static final String JOIN_DDL =
+      "CREATE MATERIALIZED VIEW mv ("
+          + "  ts TIMESTAMP DATETIME FORMAT '1:MILLISECONDS:TIMESTAMP' 
GRANULARITY '1:DAYS',"
+          + "  country STRING,"
+          + "  amount_sum DOUBLE METRIC"
+          + ")"
+          + " REFRESH EVERY 1 DAY"
+          + " PROPERTIES ('timeColumnName' = 'ts', 'bucketTimePeriod' = '1d')"
+          + " AS SELECT f.ts, d.country, SUM(f.amount) AS amount_sum"
+          + " FROM fact f JOIN dim d ON f.id = d.id GROUP BY f.ts, d.country";
+
+  @AfterMethod
+  public void restoreDefaultHandler() {
+    /// The handler is process-wide; restore the default so sibling tests see 
single-source behavior.
+    DdlCompiler.setMaterializedViewDdlHandler(new 
DefaultMaterializedViewDdlHandler());
+  }
+
+  @Test
+  public void defaultsToSingleSourceHandler() {
+    assertTrue(DdlCompiler.getMaterializedViewDdlHandler() instanceof 
DefaultMaterializedViewDdlHandler);
+  }
+
+  @Test
+  public void defaultHandlerRejectsJoin() {
+    DdlCompilationException e =
+        expectThrows(DdlCompilationException.class, () -> 
DdlCompiler.compile(JOIN_DDL));
+    assertTrue(e.getMessage().contains("JOIN"), e.getMessage());
+  }
+
+  @Test
+  public void registeredHandlerAcceptsJoinAndRoutesToCustomTaskType() {
+    DdlCompiler.setMaterializedViewDdlHandler(new MultiSourceTestHandler());
+
+    CompiledDdl compiled = DdlCompiler.compile(JOIN_DDL);
+    assertEquals(compiled.getOperation(), 
DdlOperation.CREATE_MATERIALIZED_VIEW);
+    TableConfig tableConfig = ((CompiledCreateMaterializedView) 
compiled).getTableConfig();
+    assertTrue(tableConfig.isMaterializedView());
+
+    Map<String, Map<String, String>> taskTypes = 
tableConfig.getTaskConfig().getTaskTypeConfigsMap();
+    /// Stamped under the handler's task type; the built-in single-stage type 
is absent.
+    assertTrue(taskTypes.containsKey(MultiSourceTestHandler.TASK_TYPE));
+    assertFalse(taskTypes.containsKey(MaterializedViewTask.TASK_TYPE));
+
+    Map<String, String> mvTaskConfig = 
taskTypes.get(MultiSourceTestHandler.TASK_TYPE);
+    assertEquals(mvTaskConfig.get(MaterializedViewTask.DEFINED_SQL_KEY),
+        "SELECT f.ts, d.country, SUM(f.amount) AS amount_sum"
+            + " FROM fact f JOIN dim d ON f.id = d.id GROUP BY f.ts, 
d.country");
+    /// bucketTimePeriod routed under the custom task type, and the 
consistency check passed against it.
+    
assertEquals(mvTaskConfig.get(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY), 
"1d");
+  }
+
+  @Test
+  public void customTaskTypePrefixedKnobsArePreserved() {
+    DdlCompiler.setMaterializedViewDdlHandler(new MultiSourceTestHandler());
+
+    /// A `task.<customTaskType>.<knob>` property must survive into the 
stamped task config (it must
+    /// not be overwritten by the synthetic task-config map under the same 
task type).
+    CompiledDdl compiled = DdlCompiler.compile(
+        "CREATE MATERIALIZED VIEW mv ("
+            + "  ts TIMESTAMP DATETIME FORMAT '1:MILLISECONDS:TIMESTAMP' 
GRANULARITY '1:DAYS',"
+            + "  amount_sum DOUBLE METRIC"
+            + ")"
+            + " REFRESH EVERY 1 DAY"
+            + " PROPERTIES ('timeColumnName' = 'ts', 'bucketTimePeriod' = 
'1d',"
+            + "   'task." + MultiSourceTestHandler.TASK_TYPE + ".customKnob' = 
'v')"
+            + " AS SELECT f.ts, SUM(f.amount) AS amount_sum"
+            + " FROM fact f JOIN dim d ON f.id = d.id GROUP BY f.ts");
+    Map<String, String> mvTaskConfig = ((CompiledCreateMaterializedView) 
compiled).getTableConfig()
+        
.getTaskConfig().getTaskTypeConfigsMap().get(MultiSourceTestHandler.TASK_TYPE);
+    assertEquals(mvTaskConfig.get("customKnob"), "v");
+    /// Synthetic / bare-form keys remain present alongside the prefixed knob.
+    
assertEquals(mvTaskConfig.get(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY), 
"1d");
+    assertTrue(mvTaskConfig.containsKey(MaterializedViewTask.DEFINED_SQL_KEY));
+  }
+
+  @Test
+  public void registeredHandlerJoinWithoutExplicitColumnsRejected() {
+    DdlCompiler.setMaterializedViewDdlHandler(new MultiSourceTestHandler());
+
+    /// Even when the handler permits a JOIN, schema inference (no column 
list) is single-source-only.
+    DdlCompilationException e = expectThrows(DdlCompilationException.class, () 
-> DdlCompiler.compile(
+        "CREATE MATERIALIZED VIEW mv"
+            + " REFRESH EVERY 1 DAY"
+            + " PROPERTIES ('timeColumnName' = 'ts', 'bucketTimePeriod' = 
'1d')"
+            + " AS SELECT f.ts, d.country FROM fact f JOIN dim d ON f.id = 
d.id"));
+    assertTrue(e.getMessage().contains("explicit column list"), 
e.getMessage());
+  }
+
+  @Test
+  public void handlerReturningNullTaskTypeFailsClearly() {
+    DdlCompiler.setMaterializedViewDdlHandler(new MaterializedViewDdlHandler() 
{
+      @Override
+      public void validateDefinedQuery(SqlNode queryNode, String definedSql, 
Map<String, String> properties) {
+      }
+
+      @Override
+      public String applyTaskConfig(Map<String, String> properties, String 
definedSql,
+          @Nullable String schedule, TableConfigBuilder builder) {
+        return null;
+      }
+    });
+    DdlCompilationException e =
+        expectThrows(DdlCompilationException.class, () -> 
DdlCompiler.compile(JOIN_DDL));
+    assertTrue(e.getMessage().contains("null task type"), e.getMessage());
+  }
+
+  @Test
+  public void handlerReturningUnstampedTaskTypeFailsClearly() {
+    DdlCompiler.setMaterializedViewDdlHandler(new MaterializedViewDdlHandler() 
{
+      @Override
+      public void validateDefinedQuery(SqlNode queryNode, String definedSql, 
Map<String, String> properties) {
+      }
+
+      @Override
+      public String applyTaskConfig(Map<String, String> properties, String 
definedSql,
+          @Nullable String schedule, TableConfigBuilder builder) {
+        /// Stamp under one task type but return a different one — a 
handler-contract violation.
+        MaterializedViewPropertyRouter.apply(properties, definedSql, schedule, 
builder, "StampedTask");
+        return "ReturnedButNotStampedTask";
+      }
+    });
+    DdlCompilationException e =
+        expectThrows(DdlCompilationException.class, () -> 
DdlCompiler.compile(JOIN_DDL));
+    assertTrue(e.getMessage().contains("did not stamp a task config"), 
e.getMessage());
+  }
+
+  /// Test handler that permits multi-source (JOIN) definitions and routes the 
MV task config under a
+  /// distinct task type via the task-type-parameterized 
[MaterializedViewPropertyRouter#apply].
+  private static final class MultiSourceTestHandler implements 
MaterializedViewDdlHandler {
+    static final String TASK_TYPE = "CustomMaterializedViewTask";
+
+    @Override
+    public void validateDefinedQuery(SqlNode queryNode, String definedSql, 
Map<String, String> properties) {
+      /// Multi-source allowed: no JOIN rejection, no single-stage 
re-compilation.
+    }
+
+    @Override
+    public boolean supportsSchemaInference(Map<String, String> properties) {
+      /// Multi-source projection inference is not supported; an explicit 
column list is required.
+      return false;
+    }
+
+    @Override
+    public String applyTaskConfig(Map<String, String> properties, String 
definedSql,
+        @Nullable String schedule, TableConfigBuilder builder) {
+      MaterializedViewPropertyRouter.apply(properties, definedSql, schedule, 
builder, TASK_TYPE);
+      return TASK_TYPE;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to