This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new edfbf6943a5 Make CREATE MATERIALIZED VIEW DDL pluggable via
MaterializedViewDdlHandler (#18639)
edfbf6943a5 is described below
commit edfbf6943a58732c7d51c74b88f495c52c25afd5
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Jun 1 12:48:33 2026 -0700
Make CREATE MATERIALIZED VIEW DDL pluggable via MaterializedViewDdlHandler
(#18639)
Adds a MaterializedViewDdlHandler extension point so CREATE MATERIALIZED
VIEW
can target an alternative engine / minion task type. The default handler
targets
the single-stage engine (re-compiles the AS-clause as a single-stage Pinot
query,
rejecting JOIN / multi-source, routes under MaterializedViewTask); a
downstream
distribution can register an MSE handler that accepts joins and stamps its
own
task type. The handler — installed via
DdlCompiler.setMaterializedViewDdlHandler —
owns engine-specific verification (validateDefinedQuery) and whether
projection
schema inference is allowed (supportsSchemaInference). DdlCompiler fails
fast with
a clear message if a handler returns a null/unstamped task type.
MaterializedViewPropertyRouter.apply gains a taskType-parameterized
overload, and
the "MV's own task type" matching is generalized from the hard-coded
MaterializedViewTask to that task type so task.<taskType>.* knobs route
correctly
(and are not dropped) for custom task types.
Documents the extension-point contract: a handler stamping a non-built-in
task type
owns that type's complete runtime including definition-metadata persistence
and
consistency tracking. The built-in controller-side MV machinery
(MaterializedViewDefinitionMetadata persistence +
MaterializedViewConsistencyManager)
keys on MaterializedViewTask and now intentionally and explicitly skips MVs
stamped
with a different task type (logged at INFO, not WARN), leaving
freshness/consistency
to the owning task type.
Behavior is unchanged when no alternative handler is registered. All
pinot-sql-ddl
tests pass.
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
.../helix/core/PinotHelixResourceManager.java | 26 ++-
.../apache/pinot/sql/ddl/compile/DdlCompiler.java | 125 +++++++-------
.../compile/DefaultMaterializedViewDdlHandler.java | 63 +++++++
.../ddl/compile/MaterializedViewDdlHandler.java | 118 +++++++++++++
.../compile/MaterializedViewPropertyRouter.java | 22 ++-
.../compile/MaterializedViewDdlHandlerTest.java | 189 +++++++++++++++++++++
6 files changed, 469 insertions(+), 74 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index f847aa96ad4..ed1439afbed 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -5473,8 +5473,18 @@ public class PinotHelixResourceManager {
try {
Map<String, String> taskConfigs =
tableConfig.getMaterializedViewTaskConfigs();
if (taskConfigs == null) {
- LOGGER.warn("MV table {} has no MaterializedViewTask config; skipping
definition metadata persist",
- tableNameWithType);
+ // This MV is not materialized by the built-in MaterializedViewTask. A
downstream
+ // MaterializedViewDdlHandler / task type may materialize it via its
own minion runtime; by
+ // contract (see MaterializedViewDdlHandler) that runtime also owns
its definition metadata
+ // and consistency tracking, so the built-in machinery here
intentionally does not manage it.
+ if (tableConfig.hasMaterializedViewTaskWithDefinedSql()) {
+ LOGGER.info("MV table {} uses a non-built-in MV task type; its
definition metadata is owned "
+ + "by that task type's runtime — skipping built-in
MaterializedViewTask metadata persist",
+ tableNameWithType);
+ } else {
+ LOGGER.warn("MV table {} has no MaterializedViewTask config;
skipping definition metadata persist",
+ tableNameWithType);
+ }
return;
}
String definedSql =
taskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
@@ -5705,8 +5715,16 @@ public class PinotHelixResourceManager {
}
Map<String, String> materializedViewTaskConfigs =
tableConfig.getMaterializedViewTaskConfigs();
if (materializedViewTaskConfigs == null) {
- LOGGER.warn("MV table {} has no MaterializedViewTask config for
consistency registration",
- tableConfig.getTableName());
+ // Not a built-in MaterializedViewTask MV: a downstream task type
materializes it and, by
+ // contract (see MaterializedViewDdlHandler), owns its own consistency
tracking. The built-in
+ // MaterializedViewConsistencyManager intentionally does not register
it here.
+ if (tableConfig.hasMaterializedViewTaskWithDefinedSql()) {
+ LOGGER.info("MV table {} uses a non-built-in MV task type;
consistency is owned by that task "
+ + "type's runtime — skipping built-in consistency registration",
tableConfig.getTableName());
+ } else {
+ LOGGER.warn("MV table {} has no MaterializedViewTask config for
consistency registration",
+ tableConfig.getTableName());
+ }
return;
}
String definedSQL =
materializedViewTaskConfigs.get(CommonConstants.MaterializedViewTask.DEFINED_SQL_KEY);
diff --git
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java
index 61bd20c45ae..0a4b791fc5b 100644
---
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java
+++
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DdlCompiler.java
@@ -30,7 +30,6 @@ import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.sql.SqlBasicTypeNameSpec;
import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlIdentifier;
-import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
@@ -88,9 +87,29 @@ import
org.apache.pinot.sql.parsers.parser.SqlPinotShowTables;
/// Stateless and thread-safe. All entry points are static.
public final class DdlCompiler {
+ /// Pluggable handler for `CREATE MATERIALIZED VIEW` compilation (query
validation + task-config
+ /// routing). Defaults to single-source / single-stage behavior. A
distribution that supports
+ /// richer MV definitions (e.g. a multi-stage-engine MV whose `AS` clause is
a JOIN) installs its
+ /// own handler via [#setMaterializedViewDdlHandler] once at controller
startup, before any DDL is
+ /// served. Volatile because it is read on the DDL request path and may be
set from a different
+ /// (startup) thread; not intended to be swapped while DDL is in flight.
+ private static volatile MaterializedViewDdlHandler
_materializedViewDdlHandler =
+ new DefaultMaterializedViewDdlHandler();
+
private DdlCompiler() {
}
+ /// Installs the [MaterializedViewDdlHandler] used for all subsequent
`CREATE MATERIALIZED VIEW`
+ /// compilations. Call once at controller startup; defaults to
[DefaultMaterializedViewDdlHandler].
+ public static void setMaterializedViewDdlHandler(MaterializedViewDdlHandler
handler) {
+ _materializedViewDdlHandler = handler;
+ }
+
+ /// Returns the active materialized-view DDL handler (never null; defaults
to single-source SSE).
+ public static MaterializedViewDdlHandler getMaterializedViewDdlHandler() {
+ return _materializedViewDdlHandler;
+ }
+
/// Parses and compiles a DDL statement using a stateless {@link
DdlCompileContext}.
///
/// @deprecated Use {@link #compile(String, DdlCompileContext)} and supply a
real context.
@@ -526,11 +545,14 @@ public final class DdlCompiler {
List<String> warnings = new ArrayList<>();
Map<String, String> properties =
resolveProperties(node.getProperties().getList());
- // Reject JOIN early so the inferer's single-source assumption (and the
analyzer's
- // downstream check) cannot be violated by a definedSql we haven't
validated yet.
- rejectJoinInDefinedSql(node.getQuery());
+ // The engine (SSE vs MSE) is the registered handler's choice — not the
compiler's. Extract the
+ // verbatim AS-clause text, then let the handler validate it for its
target engine (the default
+ // handler re-compiles it as a single-stage Pinot query; an MSE handler
does a multi-stage check).
+ // Done before column resolution / schema inference so the single-source
inferer cannot be fed a
+ // definition the handler hasn't accepted.
+ MaterializedViewDdlHandler mvHandler = getMaterializedViewDdlHandler();
String definedSql = extractDefinedSql(originalSql, node.getQuery());
- verifyDefinedSqlIsParseable(definedSql);
+ mvHandler.validateDefinedQuery(node.getQuery(), definedSql, properties);
// Two paths:
// 1) Explicit column list — legacy path, will be deprecated once the
inferer matures.
@@ -539,6 +561,15 @@ public final class DdlCompiler {
// surfaces a clear message when it's absent.
List<ResolvedColumnDefinition> columns;
if (node.getColumns().getList().isEmpty()) {
+ // Schema inference from the AS <query> projection is
single-source-only. A handler whose
+ // definedSQL may be multi-source (e.g. a multi-stage-engine MV) reports
that it does not
+ // support inference, so an explicit column list is required.
+ if (!mvHandler.supportsSchemaInference(properties)) {
+ throw new DdlCompilationException(
+ "CREATE MATERIALIZED VIEW requires an explicit column list for
this materialized view; "
+ + "schema inference from the AS <query> projection is only
supported for "
+ + "single-source materialized views.");
+ }
// Fall back to the request-header database when the DDL itself does not
qualify the
// MV name — Calcite needs SOME database to resolve `FROM src` against,
and the
// header's intent ("operate on database X") matches where the MV will
be created.
@@ -598,10 +629,17 @@ public final class DdlCompiler {
TableConfigBuilder builder = new TableConfigBuilder(TableType.OFFLINE)
.setTableName(tableNameForConfig)
.setIsMaterializedView(true);
- MaterializedViewPropertyRouter.apply(properties, definedSql, schedule,
builder);
+ // The handler routes the MV properties onto the builder and returns the
minion task type it
+ // stamped (default: MaterializedViewTask). The consistency check below
uses that task type.
+ String mvTaskType = mvHandler.applyTaskConfig(properties, definedSql,
schedule, builder);
+ if (mvTaskType == null) {
+ throw new DdlCompilationException("MaterializedViewDdlHandler "
+ + mvHandler.getClass().getName() + " returned a null task type from
applyTaskConfig; it "
+ + "must return the task type it stamped onto the table config.");
+ }
TableConfig tableConfig = builder.build();
- validateMaterializedViewConsistency(schema, tableConfig);
+ validateMaterializedViewConsistency(schema, tableConfig, mvTaskType);
return new CompiledCreateMaterializedView(resolved.getDatabaseName(),
schema, tableConfig,
node.isIfNotExists(), warnings);
@@ -734,64 +772,10 @@ public final class DdlCompiler {
+ sql.length() + ".");
}
- /// Walks the parsed AS-clause AST and throws a clear, MV-specific error if
any JOIN node
- /// is found at any depth (top-level FROM, subquery FROM, lateral join,
etc.). See the
- /// call-site comment in [#compileCreateMaterializedView] for *why* we do
this against the
- /// AST rather than letting the downstream re-parse / analyzer surface the
limitation.
- private static void rejectJoinInDefinedSql(SqlNode queryNode) {
- if (containsJoin(queryNode)) {
- throw new DdlCompilationException(
- "CREATE MATERIALIZED VIEW does not support JOIN in the AS clause. "
- + "Materialized views currently read from a single source table;
"
- + "pre-join the inputs into a base table and reference that
single table "
- + "in the MV definition.");
- }
- }
-
- /// Returns true iff `node` or any descendant is a [SqlKind#JOIN] call. We
walk via the
- /// `SqlCall#getOperandList` / `SqlNodeList` axes so the traversal stays
wrapper-agnostic
- /// (SqlOrderBy, SqlWith, SqlExplain) — mirroring how [#collectPositions]
walks the tree.
- /// Leaves (SqlLiteral, SqlIdentifier, ...) terminate the recursion
naturally.
- private static boolean containsJoin(@Nullable SqlNode node) {
- if (node == null) {
- return false;
- }
- if (node.getKind() == SqlKind.JOIN) {
- return true;
- }
- if (node instanceof SqlCall) {
- for (SqlNode child : ((SqlCall) node).getOperandList()) {
- if (containsJoin(child)) {
- return true;
- }
- }
- } else if (node instanceof SqlNodeList) {
- for (SqlNode child : (SqlNodeList) node) {
- if (containsJoin(child)) {
- return true;
- }
- }
- }
- return false;
- }
-
- /// Sanity check: the substring we extracted must be a standalone parseable
Pinot query.
- /// This guards against DDL-layer slicing bugs (off-by-one in
[#lineColToOffset], parser
- /// position quirks) so the error surfaces in the DDL layer rather than at
first scheduler
- /// tick or at create-time analysis (PR 3).
- private static void verifyDefinedSqlIsParseable(String definedSql) {
- try {
- CalciteSqlParser.compileToPinotQuery(definedSql);
- } catch (Exception e) {
- throw new DdlCompilationException(
- "AS <query> did not re-parse as a Pinot query: " + e.getMessage()
- + " (extracted text: " + definedSql + ")", e);
- }
- }
-
/// Cross-checks: `timeColumnName` must reference a declared DATETIME
column, and
/// `bucketTimePeriod` must be present so the scheduler has a window size.
- private static void validateMaterializedViewConsistency(Schema schema,
TableConfig tableConfig) {
+ private static void validateMaterializedViewConsistency(Schema schema,
TableConfig tableConfig,
+ String mvTaskType) {
String timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
if (timeColumnName == null || timeColumnName.isEmpty()) {
throw new DdlCompilationException(
@@ -809,9 +793,18 @@ public final class DdlCompiler {
}
Map<String, String> mvTaskConfig = tableConfig.getTaskConfig() == null
? null
- :
tableConfig.getTaskConfig().getConfigsForTaskType(MaterializedViewTask.TASK_TYPE);
- if (mvTaskConfig == null
- ||
!mvTaskConfig.containsKey(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY)) {
+ : tableConfig.getTaskConfig().getConfigsForTaskType(mvTaskType);
+ if (mvTaskConfig == null) {
+ // The handler's applyTaskConfig returned this task type but did not
stamp a matching task
+ // config onto the builder — a handler-contract violation, not a user
error. Surface it as
+ // such so a custom MaterializedViewDdlHandler author gets an actionable
diagnostic rather
+ // than the misleading "bucketTimePeriod missing" message below.
+ throw new DdlCompilationException(
+ "MaterializedViewDdlHandler returned task type '" + mvTaskType + "'
from applyTaskConfig "
+ + "but did not stamp a task config under it; the returned task
type must match the "
+ + "task config written to the table.");
+ }
+ if
(!mvTaskConfig.containsKey(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY)) {
throw new DdlCompilationException(
"CREATE MATERIALIZED VIEW requires a 'bucketTimePeriod' property
(e.g. '1d', '1h'); "
+ "it defines the time window each refresh tick materializes.");
diff --git
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java
new file mode 100644
index 00000000000..5d2e3c55b09
--- /dev/null
+++
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/DefaultMaterializedViewDdlHandler.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.ddl.compile;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+
+
+/// Default, single-stage-engine (SSE) materialized-view DDL handler.
+///
+/// A materialized view reads from exactly one source table: a JOIN in the `AS
<query>` clause is
+/// rejected, and the MV task configuration is routed under the built-in
`MaterializedViewTask` task
+/// type. This is the behavior of OSS Pinot when no alternative handler is
registered.
+public class DefaultMaterializedViewDdlHandler implements
MaterializedViewDdlHandler {
+
+ @Override
+ public void validateDefinedQuery(SqlNode queryNode, String definedSql,
Map<String, String> properties) {
+ if (MaterializedViewDdlHandler.containsJoin(queryNode)) {
+ throw new DdlCompilationException(
+ "CREATE MATERIALIZED VIEW does not support JOIN in the AS clause. "
+ + "Materialized views currently read from a single source table;
"
+ + "pre-join the inputs into a base table and reference that
single table "
+ + "in the MV definition.");
+ }
+ /// Re-compile the extracted definedSQL as a single-stage Pinot query.
This both guards against
+ /// DDL-layer slicing bugs (off-by-one in the parser-position extraction)
and enforces SSE
+ /// compatibility, so the error surfaces here rather than at the first
scheduler tick / analysis.
+ try {
+ CalciteSqlParser.compileToPinotQuery(definedSql);
+ } catch (Exception e) {
+ throw new DdlCompilationException(
+ "AS <query> did not re-parse as a single-stage Pinot query: " +
e.getMessage()
+ + " (extracted text: " + definedSql + ")", e);
+ }
+ }
+
+ @Override
+ public String applyTaskConfig(Map<String, String> properties, String
definedSql,
+ @Nullable String schedule, TableConfigBuilder builder) {
+ MaterializedViewPropertyRouter.apply(properties, definedSql, schedule,
builder);
+ return MaterializedViewTask.TASK_TYPE;
+ }
+}
diff --git
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java
new file mode 100644
index 00000000000..303586501ee
--- /dev/null
+++
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandler.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.ddl.compile;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+
+
+/// Extension point for compiling `CREATE MATERIALIZED VIEW ... AS <query>`
statements.
+///
+/// The `definedSQL` of a materialized view can be executed by either query
engine; the handler — not
+/// the compiler — decides and validates accordingly. The default
implementation
+/// ([DefaultMaterializedViewDdlHandler]) targets the **single-stage engine
(SSE)**: it verifies the
+/// `definedSQL` compiles as a single-stage Pinot query (which restricts it to
a single source table)
+/// and routes the MV task configuration under the built-in
`MaterializedViewTask` task type.
+///
+/// Downstream distributions can register an alternative handler via
+/// [DdlCompiler#setMaterializedViewDdlHandler] to target the **multi-stage
engine (MSE)** — accepting
+/// multi-source / JOIN definitions and routing them under a different minion
task type. Because the
+/// controller validates the resulting `TableConfig` (running the task
generator's validation)
+/// *before* persisting it, an MSE handler MUST stamp a task type whose
generator can validate that
+/// definition — stamping the built-in `MaterializedViewTask` for a JOIN would
be rejected by the
+/// single-source
[org.apache.pinot.materializedview.analysis.MaterializedViewAnalyzer].
+///
+/// **Contract for non-built-in task types.** A handler that stamps a task
type other than the
+/// built-in `MaterializedViewTask` is responsible for the *complete* runtime
of that task type —
+/// not only the minion task generator / executor that materializes the view,
but also its
+/// definition-metadata persistence and consistency tracking. OSS's built-in
controller-side MV
+/// machinery (`MaterializedViewDefinitionMetadata` persistence and the
+/// `MaterializedViewConsistencyManager`) keys on `MaterializedViewTask` and
therefore manages only
+/// built-in MVs; it intentionally skips MVs stamped with a different task
type (the
+/// `isMaterializedView=true` flag and DDL lifecycle still apply uniformly).
This keeps the built-in
+/// single-source freshness model from being applied to definitions it cannot
reason about (e.g. a
+/// multi-source JOIN), and leaves freshness/consistency to the task type that
owns the MV.
+public interface MaterializedViewDdlHandler {
+
+ /// Validates the MV's `AS <query>` clause for the target engine. Called
before column resolution /
+ /// schema inference. The handler is responsible for whatever
engine-specific checks apply — the
+ /// default (SSE) handler re-compiles `definedSql` as a single-stage Pinot
query (rejecting
+ /// multi-source / JOIN queries); an MSE handler performs a
multi-stage-compatible check. Throw
+ /// [DdlCompilationException] with a user-actionable message when the
definition is not supported.
+ ///
+ /// @param queryNode the parsed `AS <query>` SqlNode (use [#containsJoin]
for AST-level checks)
+ /// @param definedSql the extracted verbatim `AS <query>` text (re-parse it
to guard against
+ /// DDL-layer slicing bugs and to enforce engine
compatibility)
+ /// @param properties the MV's `PROPERTIES (...)` map (raw keys/values as
typed by the user)
+ void validateDefinedQuery(SqlNode queryNode, String definedSql, Map<String,
String> properties);
+
+ /// Whether the MV schema may be inferred from the `AS <query>` projection
when the DDL omits an
+ /// explicit column list. Defaults to {@code true} (single-source projection
inference). A handler
+ /// whose `definedSQL` may be multi-source — where projection inference is
not supported — returns
+ /// {@code false}, so the compiler requires an explicit column list.
+ ///
+ /// @param properties the MV's `PROPERTIES (...)` map
+ default boolean supportsSchemaInference(Map<String, String> properties) {
+ return true;
+ }
+
+ /// Routes the MV's `PROPERTIES (...)` plus the synthetic `definedSql` /
`schedule` onto `builder`
+ /// and returns the minion task type that was stamped (must be non-null and
equal to the task-type
+ /// key written to `builder`). The returned task type is used to validate MV
consistency (e.g. that
+ /// `bucketTimePeriod` is present under it).
+ ///
+ /// @param properties the MV's `PROPERTIES (...)` map
+ /// @param definedSql the verbatim `AS <query>` text
+ /// @param schedule the Quartz cron derived from `REFRESH EVERY`, or
{@code null} when absent
+ /// @param builder the table-config builder to populate (already has name
+ isMaterializedView)
+ /// @return the minion task type stamped onto the builder (e.g. {@code
MaterializedViewTask})
+ String applyTaskConfig(Map<String, String> properties, String definedSql,
@Nullable String schedule,
+ TableConfigBuilder builder);
+
+ /// Returns true iff `node` or any descendant is a [SqlKind#JOIN] call.
Walks the
+ /// `SqlCall#getOperandList` / `SqlNodeList` axes so the traversal stays
wrapper-agnostic
+ /// (SqlOrderBy, SqlWith, SqlExplain). Leaves (SqlLiteral, SqlIdentifier,
...) terminate naturally.
+ static boolean containsJoin(@Nullable SqlNode node) {
+ if (node == null) {
+ return false;
+ }
+ if (node.getKind() == SqlKind.JOIN) {
+ return true;
+ }
+ if (node instanceof SqlCall) {
+ for (SqlNode child : ((SqlCall) node).getOperandList()) {
+ if (containsJoin(child)) {
+ return true;
+ }
+ }
+ } else if (node instanceof SqlNodeList) {
+ for (SqlNode child : (SqlNodeList) node) {
+ if (containsJoin(child)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+}
diff --git
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java
index 32f3269ff28..9c2eaf0e560 100644
---
a/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java
+++
b/pinot-sql-ddl/src/main/java/org/apache/pinot/sql/ddl/compile/MaterializedViewPropertyRouter.java
@@ -139,6 +139,16 @@ public final class MaterializedViewPropertyRouter {
/// * The caller has already validated `properties` keys do not duplicate
(case-insensitive).
public static void apply(Map<String, String> properties, String definedSql,
@Nullable String schedule, TableConfigBuilder builder) {
+ apply(properties, definedSql, schedule, builder,
MaterializedViewTask.TASK_TYPE);
+ }
+
+ /// Same as [#apply(Map, String, String, TableConfigBuilder)] but stores the
routed MV task config
+ /// under the given `taskType` instead of the built-in
`MaterializedViewTask`. Used by alternative
+ /// [MaterializedViewDdlHandler]s that materialize the MV via a different
minion task type (e.g. a
+ /// multi-stage-engine generator). Bare task-property keys (e.g.
`bucketTimePeriod`) route the same
+ /// way regardless of `taskType`; only the task type the config is stored
under changes.
+ public static void apply(Map<String, String> properties, String definedSql,
+ @Nullable String schedule, TableConfigBuilder builder, String
mvTaskType) {
Map<String, Map<String, String>> taskConfigs = new LinkedHashMap<>();
Map<String, String> customConfigs = new LinkedHashMap<>();
Map<String, String> mvTaskConfig = new LinkedHashMap<>();
@@ -210,7 +220,11 @@ public final class MaterializedViewPropertyRouter {
}
String taskType = afterPrefix.substring(0, dot);
String taskKey = afterPrefix.substring(dot + 1);
- if (MaterializedViewTask.TASK_TYPE.equals(taskType)
+ // Compare against the MV's own task type (mvTaskType), not the
hard-coded built-in, so a
+ // custom handler's task type is recognized as "this MV's task config"
rather than treated
+ // as an unrelated composed task type (which would be dropped by the
put(mvTaskType, ...)
+ // below).
+ if (mvTaskType.equals(taskType)
&& (SCHEDULE_KEY.equals(taskKey.toLowerCase(Locale.ROOT))
||
MaterializedViewTask.DEFINED_SQL_KEY.equalsIgnoreCase(taskKey))) {
throw new DdlCompilationException(
@@ -218,9 +232,9 @@ public final class MaterializedViewPropertyRouter {
+ "(REFRESH EVERY for 'schedule', AS <query> for
'definedSQL'); "
+ "remove it from PROPERTIES.");
}
- if (MaterializedViewTask.TASK_TYPE.equals(taskType)) {
+ if (mvTaskType.equals(taskType)) {
// Canonicalize the knob casing the same way the bare-form branch
does, so
- // `task.MaterializedViewTask.BUCKETTIMEPERIOD` and bare
`BUCKETTIMEPERIOD` end up
+ // `task.<mvTaskType>.BUCKETTIMEPERIOD` and bare `BUCKETTIMEPERIOD`
end up
// under the same on-wire key (the constant casing in
CommonConstants).
String canonical =
TASK_CONFIG_KEYS.getOrDefault(taskKey.toLowerCase(Locale.ROOT), taskKey);
mvTaskConfig.put(canonical, value);
@@ -244,7 +258,7 @@ public final class MaterializedViewPropertyRouter {
mvTaskConfig.put(SCHEDULE_KEY, schedule);
}
- taskConfigs.put(MaterializedViewTask.TASK_TYPE, mvTaskConfig);
+ taskConfigs.put(mvTaskType, mvTaskConfig);
builder.setTaskConfig(new TableTaskConfig(taskConfigs));
if (!customConfigs.isEmpty()) {
diff --git
a/pinot-sql-ddl/src/test/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerTest.java
b/pinot-sql-ddl/src/test/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerTest.java
new file mode 100644
index 00000000000..300e08f48c5
--- /dev/null
+++
b/pinot-sql-ddl/src/test/java/org/apache/pinot/sql/ddl/compile/MaterializedViewDdlHandlerTest.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.sql.ddl.compile;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants.MaterializedViewTask;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
+
+
+/// Tests the [MaterializedViewDdlHandler] extension point: a registered
handler can accept a JOIN in
+/// the `AS` clause and route the MV task config under an alternative task
type, while the default
+/// handler preserves single-source / single-stage behavior.
+public class MaterializedViewDdlHandlerTest {
+ private static final String JOIN_DDL =
+ "CREATE MATERIALIZED VIEW mv ("
+ + " ts TIMESTAMP DATETIME FORMAT '1:MILLISECONDS:TIMESTAMP'
GRANULARITY '1:DAYS',"
+ + " country STRING,"
+ + " amount_sum DOUBLE METRIC"
+ + ")"
+ + " REFRESH EVERY 1 DAY"
+ + " PROPERTIES ('timeColumnName' = 'ts', 'bucketTimePeriod' = '1d')"
+ + " AS SELECT f.ts, d.country, SUM(f.amount) AS amount_sum"
+ + " FROM fact f JOIN dim d ON f.id = d.id GROUP BY f.ts, d.country";
+
+ @AfterMethod
+ public void restoreDefaultHandler() {
+ /// The handler is process-wide; restore the default so sibling tests see
single-source behavior.
+ DdlCompiler.setMaterializedViewDdlHandler(new
DefaultMaterializedViewDdlHandler());
+ }
+
+ @Test
+ public void defaultsToSingleSourceHandler() {
+ assertTrue(DdlCompiler.getMaterializedViewDdlHandler() instanceof
DefaultMaterializedViewDdlHandler);
+ }
+
+ @Test
+ public void defaultHandlerRejectsJoin() {
+ DdlCompilationException e =
+ expectThrows(DdlCompilationException.class, () ->
DdlCompiler.compile(JOIN_DDL));
+ assertTrue(e.getMessage().contains("JOIN"), e.getMessage());
+ }
+
+ @Test
+ public void registeredHandlerAcceptsJoinAndRoutesToCustomTaskType() {
+ DdlCompiler.setMaterializedViewDdlHandler(new MultiSourceTestHandler());
+
+ CompiledDdl compiled = DdlCompiler.compile(JOIN_DDL);
+ assertEquals(compiled.getOperation(),
DdlOperation.CREATE_MATERIALIZED_VIEW);
+ TableConfig tableConfig = ((CompiledCreateMaterializedView)
compiled).getTableConfig();
+ assertTrue(tableConfig.isMaterializedView());
+
+ Map<String, Map<String, String>> taskTypes =
tableConfig.getTaskConfig().getTaskTypeConfigsMap();
+ /// Stamped under the handler's task type; the built-in single-stage type
is absent.
+ assertTrue(taskTypes.containsKey(MultiSourceTestHandler.TASK_TYPE));
+ assertFalse(taskTypes.containsKey(MaterializedViewTask.TASK_TYPE));
+
+ Map<String, String> mvTaskConfig =
taskTypes.get(MultiSourceTestHandler.TASK_TYPE);
+ assertEquals(mvTaskConfig.get(MaterializedViewTask.DEFINED_SQL_KEY),
+ "SELECT f.ts, d.country, SUM(f.amount) AS amount_sum"
+ + " FROM fact f JOIN dim d ON f.id = d.id GROUP BY f.ts,
d.country");
+ /// bucketTimePeriod routed under the custom task type, and the
consistency check passed against it.
+
assertEquals(mvTaskConfig.get(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY),
"1d");
+ }
+
+ @Test
+ public void customTaskTypePrefixedKnobsArePreserved() {
+ DdlCompiler.setMaterializedViewDdlHandler(new MultiSourceTestHandler());
+
+ /// A `task.<customTaskType>.<knob>` property must survive into the
stamped task config (it must
+ /// not be overwritten by the synthetic task-config map under the same
task type).
+ CompiledDdl compiled = DdlCompiler.compile(
+ "CREATE MATERIALIZED VIEW mv ("
+ + " ts TIMESTAMP DATETIME FORMAT '1:MILLISECONDS:TIMESTAMP'
GRANULARITY '1:DAYS',"
+ + " amount_sum DOUBLE METRIC"
+ + ")"
+ + " REFRESH EVERY 1 DAY"
+ + " PROPERTIES ('timeColumnName' = 'ts', 'bucketTimePeriod' =
'1d',"
+ + " 'task." + MultiSourceTestHandler.TASK_TYPE + ".customKnob' =
'v')"
+ + " AS SELECT f.ts, SUM(f.amount) AS amount_sum"
+ + " FROM fact f JOIN dim d ON f.id = d.id GROUP BY f.ts");
+ Map<String, String> mvTaskConfig = ((CompiledCreateMaterializedView)
compiled).getTableConfig()
+
.getTaskConfig().getTaskTypeConfigsMap().get(MultiSourceTestHandler.TASK_TYPE);
+ assertEquals(mvTaskConfig.get("customKnob"), "v");
+ /// Synthetic / bare-form keys remain present alongside the prefixed knob.
+
assertEquals(mvTaskConfig.get(MaterializedViewTask.BUCKET_TIME_PERIOD_KEY),
"1d");
+ assertTrue(mvTaskConfig.containsKey(MaterializedViewTask.DEFINED_SQL_KEY));
+ }
+
+ @Test
+ public void registeredHandlerJoinWithoutExplicitColumnsRejected() {
+ DdlCompiler.setMaterializedViewDdlHandler(new MultiSourceTestHandler());
+
+ /// Even when the handler permits a JOIN, schema inference (no column
list) is single-source-only.
+ DdlCompilationException e = expectThrows(DdlCompilationException.class, ()
-> DdlCompiler.compile(
+ "CREATE MATERIALIZED VIEW mv"
+ + " REFRESH EVERY 1 DAY"
+ + " PROPERTIES ('timeColumnName' = 'ts', 'bucketTimePeriod' =
'1d')"
+ + " AS SELECT f.ts, d.country FROM fact f JOIN dim d ON f.id =
d.id"));
+ assertTrue(e.getMessage().contains("explicit column list"),
e.getMessage());
+ }
+
+ @Test
+ public void handlerReturningNullTaskTypeFailsClearly() {
+ DdlCompiler.setMaterializedViewDdlHandler(new MaterializedViewDdlHandler()
{
+ @Override
+ public void validateDefinedQuery(SqlNode queryNode, String definedSql,
Map<String, String> properties) {
+ }
+
+ @Override
+ public String applyTaskConfig(Map<String, String> properties, String
definedSql,
+ @Nullable String schedule, TableConfigBuilder builder) {
+ return null;
+ }
+ });
+ DdlCompilationException e =
+ expectThrows(DdlCompilationException.class, () ->
DdlCompiler.compile(JOIN_DDL));
+ assertTrue(e.getMessage().contains("null task type"), e.getMessage());
+ }
+
+ @Test
+ public void handlerReturningUnstampedTaskTypeFailsClearly() {
+ DdlCompiler.setMaterializedViewDdlHandler(new MaterializedViewDdlHandler()
{
+ @Override
+ public void validateDefinedQuery(SqlNode queryNode, String definedSql,
Map<String, String> properties) {
+ }
+
+ @Override
+ public String applyTaskConfig(Map<String, String> properties, String
definedSql,
+ @Nullable String schedule, TableConfigBuilder builder) {
+ /// Stamp under one task type but return a different one — a
handler-contract violation.
+ MaterializedViewPropertyRouter.apply(properties, definedSql, schedule,
builder, "StampedTask");
+ return "ReturnedButNotStampedTask";
+ }
+ });
+ DdlCompilationException e =
+ expectThrows(DdlCompilationException.class, () ->
DdlCompiler.compile(JOIN_DDL));
+ assertTrue(e.getMessage().contains("did not stamp a task config"),
e.getMessage());
+ }
+
+ /// Test handler that permits multi-source (JOIN) definitions and routes the
MV task config under a
+ /// distinct task type via the task-type-parameterized
[MaterializedViewPropertyRouter#apply].
+ private static final class MultiSourceTestHandler implements
MaterializedViewDdlHandler {
+ static final String TASK_TYPE = "CustomMaterializedViewTask";
+
+ @Override
+ public void validateDefinedQuery(SqlNode queryNode, String definedSql,
Map<String, String> properties) {
+ /// Multi-source allowed: no JOIN rejection, no single-stage
re-compilation.
+ }
+
+ @Override
+ public boolean supportsSchemaInference(Map<String, String> properties) {
+ /// Multi-source projection inference is not supported; an explicit
column list is required.
+ return false;
+ }
+
+ @Override
+ public String applyTaskConfig(Map<String, String> properties, String
definedSql,
+ @Nullable String schedule, TableConfigBuilder builder) {
+ MaterializedViewPropertyRouter.apply(properties, definedSql, schedule,
builder, TASK_TYPE);
+ return TASK_TYPE;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]