This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 37a442c0ae57 [SPARK-56920][SQL][FOLLOWUP] Add CreateMetricView logical
plan and pre-parse inputColumns
37a442c0ae57 is described below
commit 37a442c0ae57d1b05008a67252e14efa21705897
Author: Wenchen Fan <[email protected]>
AuthorDate: Thu May 21 18:27:08 2026 +0800
[SPARK-56920][SQL][FOLLOWUP] Add CreateMetricView logical plan and
pre-parse inputColumns
### What changes were proposed in this pull request?
Two refactors on top of SPARK-56920 that make the metric-view plan shape
more amenable to downstream extension and simpler for resolvers.
**1. Introduce `CreateMetricView` logical plan as the parser's return
type.**
- Previously `CreateMetricViewCommand` doubled as both the parser output
and the V1 runnable command. The V2 strategy pattern-matched on it for
non-session catalogs, while the V1 path executed via `.run()`.
- Now the parser returns `CreateMetricView` (a `UnaryCommand`); for the
session catalog `ResolveSessionCatalog` rewrites it to
`CreateMetricViewCommand` (V1 runnable); for non-session v2 catalogs
`DataSourceV2Strategy` continues to dispatch to `CreateV2MetricViewExec`.
- This gives the parser a single, v1/v2-agnostic logical shape and frees
`CreateMetricViewCommand` to be V1-execution-only.
**2. Pre-parse YAML expressions into `inputColumns` on
`MetricViewPlaceholder`.**
- `MetricViewPlaceholder.desc: MetricView` is replaced with `inputColumns:
Seq[InputColumn]`. `MetricViewPlanner.parseYAML` now populates parsed
`Expression` and column `Metadata` for each dimension/measure column.
`ResolveMetricView` reads pre-parsed expressions directly instead of re-parsing
from `desc.select`.
- `MetricViewPlanner.planWrite` returns the descriptor alongside the
placeholder (used only for property emission at CREATE time), so callers that
need it don't have to recover it from the placeholder.
### Why are the changes needed?
- Splitting the parser-output logical plan from the runnable command is a
widely adopted pattern in Spark — `CreateView` → `CreateViewCommand`,
`CreateTable` → V1/V2 runnable commands, `DropTable` → `DropTableCommand`/V2
drop, etc. Aligning metric views with this pattern lets future extensions
(e.g., schema modes, temp/materialized variants) add fields to the logical plan
without changing the runnable's shape, and gives downstream rules a single
match target to dispatch from.
- Carrying pre-parsed `inputColumns` on the placeholder gives a stable,
analyzer-friendly representation and decouples the resolver from the YAML
serde. The resolver no longer needs a `ParserInterface` field for re-parsing
expressions, and the per-column metadata conversion happens once at planning
time.
### Does this PR introduce _any_ user-facing change?
No. Internal refactor only.
### How was this patch tested?
Existing test suites pass locally:
- `MetricViewV2CatalogSuite` (31/31)
- `SimpleMetricViewSuite` (19/19)
- `MetricViewFactorySuite` (16/16)
### Was this patch authored or co-authored using generative AI tooling?
Co-authored using Claude Code.
Closes #56010 from cloud-fan/SPARK-54119-followup.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/metricview/logical/metricViewNodes.scala | 56 +++++++++++++++--
.../sql/metricview/util/MetricViewPlanner.scala | 39 +++++++++---
.../sql/catalyst/analysis/ResolveMetricView.scala | 71 +++++++++-------------
.../catalyst/analysis/ResolveSessionCatalog.scala | 15 +++++
.../spark/sql/execution/SparkSqlParser.scala | 3 +-
.../sql/execution/command/metricViewCommands.scala | 14 ++---
.../datasources/v2/CreateV2MetricViewExec.scala | 4 +-
.../datasources/v2/DataSourceV2Strategy.scala | 9 ++-
.../sql/execution/MetricViewV2CatalogSuite.scala | 7 ++-
9 files changed, 150 insertions(+), 68 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala
index a7fa037a4b33..76163794db33 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala
@@ -19,14 +19,62 @@ package org.apache.spark.sql.metricview.logical
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet,
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand,
UnaryNode}
import
org.apache.spark.sql.catalyst.trees.TreePattern.{METRIC_VIEW_PLACEHOLDER,
RESOLVED_METRIC_VIEW, TreePattern}
-import org.apache.spark.sql.metricview.serde.MetricView
+import org.apache.spark.sql.types.Metadata
+
+/**
+ * A parsed metric-view column, populated by
+ * [[org.apache.spark.sql.metricview.util.MetricViewPlanner]] from the YAML
definition before the
+ * placeholder is handed to the analyzer. Carrying the parsed [[Expression]]
(rather than the raw
+ * YAML descriptor) lets downstream resolution rules read a stable,
analyzer-friendly
+ * representation without re-parsing.
+ */
+sealed trait InputColumn {
+ def name: String
+ def expr: Expression
+ def metadata: Metadata
+}
+
+case class DimensionInputColumn(
+ name: String,
+ expr: Expression,
+ metadata: Metadata) extends InputColumn
+
+case class MeasureInputColumn(
+ name: String,
+ expr: Expression,
+ metadata: Metadata) extends InputColumn
+
+/**
+ * Logical plan for `CREATE VIEW ... WITH METRICS`. This is the v1/v2-agnostic
representation
+ * the parser returns; downstream analysis decides which runnable form it
becomes:
+ * - For the session catalog:
[[org.apache.spark.sql.execution.command.CreateMetricViewCommand]]
+ * via an analyzer rule that fires once the identifier is resolved.
+ * - For non-session v2
[[org.apache.spark.sql.connector.catalog.ViewCatalog]]s: a
+ * `CreateV2MetricViewExec` produced by `DataSourceV2Strategy`.
+ *
+ * Splitting this from the runnable command lets the parser return a single
logical shape
+ * regardless of target catalog (instead of pre-committing to a runnable
command at parse
+ * time), and gives downstream rules a single match target to dispatch from.
+ */
+case class CreateMetricView(
+ child: LogicalPlan,
+ userSpecifiedColumns: Seq[(String, Option[String])],
+ comment: Option[String],
+ properties: Map[String, String],
+ originalText: String,
+ allowExisting: Boolean,
+ replace: Boolean) extends UnaryCommand {
+ override protected def withNewChildInternal(newChild: LogicalPlan):
LogicalPlan = {
+ copy(child = newChild)
+ }
+}
case class MetricViewPlaceholder(
metadata: CatalogTable,
- desc: MetricView,
+ inputColumns: Seq[InputColumn],
outputMetrics: Seq[Attribute],
child: LogicalPlan,
isCreate: Boolean = false) extends UnaryNode {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
index 2ac20ebeaa95..ed7713819c15 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
@@ -24,24 +24,26 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.metricview.logical.MetricViewPlaceholder
-import org.apache.spark.sql.metricview.serde.{AssetSource, MetricView,
MetricViewFactory, MetricViewValidationException,
MetricViewYAMLParsingException, SQLSource}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.metricview.logical.{DimensionInputColumn,
InputColumn, MeasureInputColumn, MetricViewPlaceholder}
+import org.apache.spark.sql.metricview.serde.{AssetSource,
DimensionExpression, JsonUtils, MeasureExpression, MetricView,
MetricViewFactory, MetricViewValidationException,
MetricViewYAMLParsingException, SQLSource}
+import org.apache.spark.sql.types.{Metadata, StructType}
object MetricViewPlanner {
def planWrite(
metadata: CatalogTable,
yaml: String,
- sqlParser: ParserInterface): MetricViewPlaceholder = {
+ sqlParser: ParserInterface): (MetricViewPlaceholder, MetricView) = {
val (metricView, dataModelPlan) = parseYAML(yaml, sqlParser)
- MetricViewPlaceholder(
+ val inputColumns = buildInputColumns(metricView, sqlParser)
+ val placeholder = MetricViewPlaceholder(
metadata,
- metricView,
+ inputColumns,
Seq.empty,
dataModelPlan,
isCreate = true
)
+ (placeholder, metricView)
}
def planRead(
@@ -50,14 +52,37 @@ object MetricViewPlanner {
sqlParser: ParserInterface,
expectedSchema: StructType): MetricViewPlaceholder = {
val (metricView, dataModelPlan) = parseYAML(yaml, sqlParser)
+ val inputColumns = buildInputColumns(metricView, sqlParser)
MetricViewPlaceholder(
metadata,
- metricView,
+ inputColumns,
DataTypeUtils.toAttributes(expectedSchema),
dataModelPlan
)
}
+ /**
+ * Parses every column's `MeasureExpression` / `DimensionExpression` from
the YAML descriptor
+ * into a typed [[InputColumn]] (with the SQL expression already parsed) so
downstream
+ * resolution rules read a stable representation rather than re-parsing the
YAML.
+ * Column metadata is converted once here from the canonical
`ColumnMetadata` to Spark's
+ * `Metadata`, preserving the per-column annotations (e.g. dimension /
measure type marker,
+ * source expression text) the resolver attaches to output attributes.
+ */
+ private def buildInputColumns(
+ metricView: MetricView,
+ sqlParser: ParserInterface): Seq[InputColumn] = {
+ metricView.select.map { col =>
+ val md = Metadata.fromJson(JsonUtils.toJson(col.getColumnMetadata))
+ col.expression match {
+ case DimensionExpression(expr) =>
+ DimensionInputColumn(col.name, sqlParser.parseExpression(expr), md)
+ case MeasureExpression(expr) =>
+ MeasureInputColumn(col.name, sqlParser.parseExpression(expr), md)
+ }
+ }
+ }
+
private def parseYAML(
yaml: String,
sqlParser: ParserInterface): (MetricView, LogicalPlan) = {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
index a7763c2799a6..760a3d09f467 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
@@ -23,13 +23,11 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions._
import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
Measure}
-import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern.METRIC_VIEW_PLACEHOLDER
-import org.apache.spark.sql.metricview.logical.{MetricViewPlaceholder,
ResolvedMetricView}
-import org.apache.spark.sql.metricview.serde.{Column => CanonicalColumn,
DimensionExpression, JsonUtils, MeasureExpression, MetricView =>
CanonicalMetricView}
-import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder}
+import org.apache.spark.sql.metricview.logical.{DimensionInputColumn,
InputColumn, MeasureInputColumn, MetricViewPlaceholder, ResolvedMetricView}
+import org.apache.spark.sql.types.DataType
/**
* Analysis rule for resolving metric view operations (CREATE and SELECT).
@@ -165,7 +163,6 @@ import org.apache.spark.sql.types.{DataType, Metadata,
MetadataBuilder}
* into resolved logical plans that can be further optimized and executed.
*/
case class ResolveMetricView(session: SparkSession) extends Rule[LogicalPlan] {
- private def parser: ParserInterface = session.sessionState.sqlParser
override def apply(plan: LogicalPlan): LogicalPlan = {
if (!plan.containsPattern(METRIC_VIEW_PLACEHOLDER)) {
return plan
@@ -176,7 +173,7 @@ case class ResolveMetricView(session: SparkSession) extends
Rule[LogicalPlan] {
// are aggregate functions, we need to use an Aggregate node and group
by all
// dimensions to get the output schema.
case mvp: MetricViewPlaceholder if mvp.isCreate && mvp.child.resolved =>
- val (dimensions, measures) = buildMetricViewOutput(mvp.desc)
+ val (dimensions, measures) = buildMetricViewOutput(mvp.inputColumns)
Aggregate(
// group by all dimensions
dimensions.map(_.toAttribute).toSeq,
@@ -190,9 +187,11 @@ case class ResolveMetricView(session: SparkSession)
extends Rule[LogicalPlan] {
// Resolve the Aggregate node based on the metric view output and then
replace
// the AttributeReference of the metric view output to the actual
expressions.
case node @ MetricViewReadOperation(metricView) =>
- // step 1: parse the metric view definition
+ // step 1: build typed dimension / measure columns from the
placeholder's
+ // pre-parsed `inputColumns`. These were populated by
`MetricViewPlanner` from the
+ // YAML descriptor so the resolver doesn't need to re-parse
expressions here.
val (dimensions, measures) =
- parseMetricViewColumns(metricView.outputMetrics,
metricView.desc.select)
+ buildMetricViewColumns(metricView.outputMetrics,
metricView.inputColumns)
// step 2: build the Project node containing the dimensions
val dimensionExprs = dimensions.map(_.namedExpr)
@@ -235,50 +234,40 @@ case class ResolveMetricView(session: SparkSession)
extends Rule[LogicalPlan] {
}
}
- private def buildMetricViewOutput(metricView: CanonicalMetricView)
+ /**
+ * Builds the named expressions used by the CREATE-time aggregate (so the
analyzer can
+ * derive the output schema). Reads pre-parsed expressions from
[[InputColumn]]s.
+ */
+ private def buildMetricViewOutput(inputColumns: Seq[InputColumn])
: (Seq[NamedExpression], Seq[NamedExpression]) = {
val dimensions = new mutable.ArrayBuffer[NamedExpression]()
val measures = new mutable.ArrayBuffer[NamedExpression]()
- metricView.select.foreach { col =>
- val metadata = new MetadataBuilder()
-
.withMetadata(Metadata.fromJson(JsonUtils.toJson(col.getColumnMetadata)))
- .build()
- col.expression match {
- case DimensionExpression(expr) =>
- dimensions.append(
- Alias(parser.parseExpression(expr), col.name)(explicitMetadata =
Some(metadata)))
- case MeasureExpression(expr) =>
- measures.append(
- Alias(parser.parseExpression(expr), col.name)(explicitMetadata =
Some(metadata)))
- }
+ inputColumns.foreach {
+ case c: DimensionInputColumn =>
+ dimensions.append(Alias(c.expr, c.name)(explicitMetadata =
Some(c.metadata)))
+ case c: MeasureInputColumn =>
+ measures.append(Alias(c.expr, c.name)(explicitMetadata =
Some(c.metadata)))
}
(dimensions.toSeq, measures.toSeq)
}
- private def parseMetricViewColumns(
+ /**
+ * Pairs each pre-parsed [[InputColumn]] with the matching output attribute
(by position)
+ * so the resolver can replace `MEASURE(<measure-name>)` references with the
original
+ * aggregate expression while preserving exprId and data type.
+ */
+ private def buildMetricViewColumns(
metricViewOutput: Seq[Attribute],
- columns: Seq[CanonicalColumn]
+ inputColumns: Seq[InputColumn]
): (Seq[MetricViewDimension], Seq[MetricViewMeasure]) = {
val dimensions = new mutable.ArrayBuffer[MetricViewDimension]()
val measures = new mutable.ArrayBuffer[MetricViewMeasure]()
- metricViewOutput.zip(columns).foreach { case (attr, column) =>
- column.expression match {
- case DimensionExpression(expr) =>
- dimensions.append(
- MetricViewDimension(
- attr.name,
- parser.parseExpression(expr),
- attr.exprId,
- attr.dataType)
- )
- case MeasureExpression(expr) =>
- measures.append(
- MetricViewMeasure(
- attr.name,
- parser.parseExpression(expr),
- attr.exprId,
- attr.dataType)
- )
+ metricViewOutput.zip(inputColumns).foreach { case (attr, column) =>
+ column match {
+ case c: DimensionInputColumn =>
+ dimensions.append(MetricViewDimension(attr.name, c.expr,
attr.exprId, attr.dataType))
+ case c: MeasureInputColumn =>
+ measures.append(MetricViewMeasure(attr.name, c.expr, attr.exprId,
attr.dataType))
}
}
(dimensions.toSeq, measures.toSeq)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 8f0b664e10c5..8774feb4d911 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -36,6 +36,7 @@ import
org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1,
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.connector.V1Function
+import org.apache.spark.sql.metricview.logical.CreateMetricView
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType,
StructField, StructType}
import org.apache.spark.util.SparkStringUtils
@@ -577,6 +578,20 @@ class ResolveSessionCatalog(val catalogManager:
CatalogManager)
viewType = PersistedView,
viewSchemaMode = viewSchemaMode)
+ // CREATE VIEW ... WITH METRICS on the session catalog -> V1 runnable
command. Non-session
+ // v2 catalogs leave [[CreateMetricView]] in place for
`DataSourceV2Strategy` to dispatch
+ // to `CreateV2MetricViewExec`.
+ case cm @ CreateMetricView(ResolvedIdentifier(catalog, _), _, _, _, _, _,
_)
+ if isSessionCatalog(catalog) =>
+ CreateMetricViewCommand(
+ cm.child,
+ cm.userSpecifiedColumns,
+ cm.comment,
+ cm.properties,
+ cm.originalText,
+ cm.allowExisting,
+ cm.replace)
+
// ViewCatalog catalogs are handled by the v2 strategy (enumerates via
listViews); we skip
// the match here so the plan flows through unchanged. Only non-session,
non-ViewCatalog
// catalogs hit the MISSING_CATALOG_ABILITY.VIEWS rejection.
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index b4ece7329094..0ecbf7609d2b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.metricview.logical.CreateMetricView
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.util.Utils.getUriBuilder
@@ -863,7 +864,7 @@ class SparkSqlAstBuilder extends AstBuilder {
.getOrElse(Map.empty)
val codeLiteral = visitCodeLiteral(ctx.codeLiteral())
- CreateMetricViewCommand(
+ CreateMetricView(
withIdentClause(ctx.identifierReference(), UnresolvedIdentifier(_)),
userSpecifiedColumns,
visitCommentSpecList(ctx.commentSpec()),
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala
index 5937ad2300cd..3ae6dcad174f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala
@@ -149,9 +149,8 @@ object MetricViewHelper {
/**
* Analyzes a metric-view YAML body so the create / alter path can capture
the source plan
* and its dependencies. Returns the analyzed plan together with the parsed
[[MetricView]]
- * descriptor (the latter is grabbed off the un-analyzed
[[MetricViewPlaceholder]] before
- * the analyzer rewrites it away, so callers needing the descriptor for
property emission
- * don't have to re-parse the YAML).
+ * descriptor (returned alongside the placeholder by
[[MetricViewPlanner.planWrite]] so
+ * callers needing the descriptor for property emission don't have to
re-parse the YAML).
*
* `nameParts` is the multi-part target identifier (catalog + namespace +
table). The synthetic
* [[CatalogTable]] used as analysis context still carries a
[[TableIdentifier]] (capped at
@@ -189,12 +188,11 @@ object MetricViewHelper {
schema = new StructType(),
viewOriginalText = Some(viewText),
viewText = Some(viewText))
- val placeholder = MetricViewPlanner.planWrite(
+ // `planWrite` returns the placeholder (carrying pre-parsed
`inputColumns`) and the
+ // parsed YAML descriptor separately, so the caller can read the
descriptor for
+ // property emission (e.g. `metric_view.*` keys) without keeping it on the
placeholder.
+ val (placeholder, metricView) = MetricViewPlanner.planWrite(
tableMeta, viewText, session.sessionState.sqlParser)
- // Grab the parsed descriptor BEFORE analysis -- the placeholder gets
replaced by
- // ResolvedMetricView during analyzer rules, after which `MetricView` is
no longer
- // recoverable from the plan tree.
- val metricView = placeholder.desc
val analyzed = analyzer.executeAndCheck(placeholder, new
QueryPlanningTracker)
ViewHelper.verifyTemporaryObjectsNotExists(
isTemporary = false, nameParts, analyzed, Seq.empty)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2MetricViewExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2MetricViewExec.scala
index d25239aac949..fb27e9feaa0b 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2MetricViewExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2MetricViewExec.scala
@@ -30,8 +30,8 @@ import
org.apache.spark.sql.connector.catalog.{DependencyList, Identifier, Table
* METRIC_VIEW`) via the [[V2ViewPreparation]] hooks.
*
* Routed by [[DataSourceV2Strategy]] from
- * [[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] when the
resolved
- * catalog is a non-session v2 catalog.
+ * [[org.apache.spark.sql.metricview.logical.CreateMetricView]] when the
resolved catalog
+ * is a non-session v2 catalog.
*/
case class CreateV2MetricViewExec(
catalog: ViewCatalog,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index ed067a3f00d1..4fd7d993cc3d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -44,11 +44,12 @@ import
org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBat
import org.apache.spark.sql.connector.write.{V1Write, Write}
import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.{FilterExec, InSubqueryExec,
LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec,
ScalarSubquery => ExecScalarSubquery, SparkPlan, SparkStrategy => Strategy}
-import org.apache.spark.sql.execution.command.{CommandUtils,
CreateMetricViewCommand, MetricViewHelper}
+import org.apache.spark.sql.execution.command.{CommandUtils, MetricViewHelper}
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy,
LogicalRelationWithTable, PushableColumnAndNestedColumn}
import
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
WriteToContinuousDataSourceExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
+import org.apache.spark.sql.metricview.logical.CreateMetricView
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.ArrayImplicits._
@@ -326,8 +327,10 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
// CREATE VIEW ... WITH METRICS on a non-session v2 catalog. Routes the
metric-view path
// through `CreateV2MetricViewExec`, which extends `V2ViewPreparation` to
share the
// `IF NOT EXISTS` short-circuit, `OR REPLACE`, and cross-type-collision
decoding with
- // `CreateV2ViewExec`. Session-catalog dispatch stays in
`CreateMetricViewCommand.run`.
- case CreateMetricViewCommand(
+ // `CreateV2ViewExec`. Session-catalog dispatch happens earlier in
`ResolveSessionCatalog`,
+ // which rewrites `CreateMetricView` (the parser's v1/v2-agnostic logical
plan) to
+ // `CreateMetricViewCommand` for v1 execution.
+ case CreateMetricView(
ResolvedIdentifier(catalog, ident), userSpecifiedColumns, comment,
properties,
originalText, allowExisting, replace) if
!CatalogV2Util.isSessionCatalog(catalog) =>
val viewCatalog = catalog match {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala
index a90adb8fee61..b1f011c9cdac 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala
@@ -29,8 +29,11 @@ import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.Metadata
/**
- * Tests that exercise
[[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a
- * non-session V2 catalog. Metric views are persisted through the same
[[ViewCatalog]] interface
+ * Tests that exercise
[[org.apache.spark.sql.metricview.logical.CreateMetricView]] on a
+ * non-session V2 catalog (routed through
+ * [[org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy]] to
+ * [[org.apache.spark.sql.execution.datasources.v2.CreateV2MetricViewExec]]).
+ * Metric views are persisted through the same [[ViewCatalog]] interface
* as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE
= METRIC_VIEW`
* plus the typed `viewDependencies` field on [[ViewInfo]]. The recording
catalog used here is a
* minimal [[TableViewCatalog]] so the same instance can also host the source
table referenced by
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]