This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 37a442c0ae57 [SPARK-56920][SQL][FOLLOWUP] Add CreateMetricView logical 
plan and pre-parse inputColumns
37a442c0ae57 is described below

commit 37a442c0ae57d1b05008a67252e14efa21705897
Author: Wenchen Fan <[email protected]>
AuthorDate: Thu May 21 18:27:08 2026 +0800

    [SPARK-56920][SQL][FOLLOWUP] Add CreateMetricView logical plan and 
pre-parse inputColumns
    
    ### What changes were proposed in this pull request?
    
    Two refactors on top of SPARK-56920 that make the metric-view plan shape 
more amenable to downstream extension and simpler for resolvers.
    
    **1. Introduce `CreateMetricView` logical plan as the parser's return 
type.**
    
    - Previously `CreateMetricViewCommand` doubled as both the parser output 
and the V1 runnable command. The V2 strategy pattern-matched on it for 
non-session catalogs, while the V1 path executed via `.run()`.
    - Now the parser returns `CreateMetricView` (a `UnaryCommand`); for the 
session catalog `ResolveSessionCatalog` rewrites it to 
`CreateMetricViewCommand` (V1 runnable); for non-session v2 catalogs 
`DataSourceV2Strategy` continues to dispatch to `CreateV2MetricViewExec`.
    - This gives the parser a single, v1/v2-agnostic logical shape and frees 
`CreateMetricViewCommand` to be V1-execution-only.
    
    **2. Pre-parse YAML expressions into `inputColumns` on 
`MetricViewPlaceholder`.**
    
    - `MetricViewPlaceholder.desc: MetricView` is replaced with `inputColumns: 
Seq[InputColumn]`. `MetricViewPlanner.parseYAML` now populates parsed 
`Expression` and column `Metadata` for each dimension/measure column. 
`ResolveMetricView` reads pre-parsed expressions directly instead of re-parsing 
from `desc.select`.
    - `MetricViewPlanner.planWrite` returns the descriptor alongside the 
placeholder (used only for property emission at CREATE time), so callers that 
need it don't have to recover it from the placeholder.
    
    ### Why are the changes needed?
    
    - Splitting the parser-output logical plan from the runnable command is a 
widely adopted pattern in Spark — `CreateView` → `CreateViewCommand`, 
`CreateTable` → V1/V2 runnable commands, `DropTable` → `DropTableCommand`/V2 
drop, etc. Aligning metric views with this pattern lets future extensions 
(e.g., schema modes, temp/materialized variants) add fields to the logical plan 
without changing the runnable's shape, and gives downstream rules a single 
match target to dispatch from.
    - Carrying pre-parsed `inputColumns` on the placeholder gives a stable, 
analyzer-friendly representation and decouples the resolver from the YAML 
serde. The resolver no longer needs a `ParserInterface` field for re-parsing 
expressions, and the per-column metadata conversion happens once at planning 
time.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. Internal refactor only.
    
    ### How was this patch tested?
    
    Existing test suites pass locally:
    - `MetricViewV2CatalogSuite` (31/31)
    - `SimpleMetricViewSuite` (19/19)
    - `MetricViewFactorySuite` (16/16)
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Co-authored using Claude Code.
    
    Closes #56010 from cloud-fan/SPARK-54119-followup.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../sql/metricview/logical/metricViewNodes.scala   | 56 +++++++++++++++--
 .../sql/metricview/util/MetricViewPlanner.scala    | 39 +++++++++---
 .../sql/catalyst/analysis/ResolveMetricView.scala  | 71 +++++++++-------------
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 15 +++++
 .../spark/sql/execution/SparkSqlParser.scala       |  3 +-
 .../sql/execution/command/metricViewCommands.scala | 14 ++---
 .../datasources/v2/CreateV2MetricViewExec.scala    |  4 +-
 .../datasources/v2/DataSourceV2Strategy.scala      |  9 ++-
 .../sql/execution/MetricViewV2CatalogSuite.scala   |  7 ++-
 9 files changed, 150 insertions(+), 68 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala
index a7fa037a4b33..76163794db33 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/logical/metricViewNodes.scala
@@ -19,14 +19,62 @@ package org.apache.spark.sql.metricview.logical
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand, 
UnaryNode}
 import 
org.apache.spark.sql.catalyst.trees.TreePattern.{METRIC_VIEW_PLACEHOLDER, 
RESOLVED_METRIC_VIEW, TreePattern}
-import org.apache.spark.sql.metricview.serde.MetricView
+import org.apache.spark.sql.types.Metadata
+
+/**
+ * A parsed metric-view column, populated by
+ * [[org.apache.spark.sql.metricview.util.MetricViewPlanner]] from the YAML 
definition before the
+ * placeholder is handed to the analyzer. Carrying the parsed [[Expression]] 
(rather than the raw
+ * YAML descriptor) lets downstream resolution rules read a stable, 
analyzer-friendly
+ * representation without re-parsing.
+ */
+sealed trait InputColumn {
+  def name: String
+  def expr: Expression
+  def metadata: Metadata
+}
+
+case class DimensionInputColumn(
+    name: String,
+    expr: Expression,
+    metadata: Metadata) extends InputColumn
+
+case class MeasureInputColumn(
+    name: String,
+    expr: Expression,
+    metadata: Metadata) extends InputColumn
+
+/**
+ * Logical plan for `CREATE VIEW ... WITH METRICS`. This is the v1/v2-agnostic 
representation
+ * the parser returns; downstream analysis decides which runnable form it 
becomes:
+ *  - For the session catalog: 
[[org.apache.spark.sql.execution.command.CreateMetricViewCommand]]
+ *    via an analyzer rule that fires once the identifier is resolved.
+ *  - For non-session v2 
[[org.apache.spark.sql.connector.catalog.ViewCatalog]]s: a
+ *    `CreateV2MetricViewExec` produced by `DataSourceV2Strategy`.
+ *
+ * Splitting this from the runnable command lets the parser return a single 
logical shape
+ * regardless of target catalog (instead of pre-committing to a runnable 
command at parse
+ * time), and gives downstream rules a single match target to dispatch from.
+ */
+case class CreateMetricView(
+    child: LogicalPlan,
+    userSpecifiedColumns: Seq[(String, Option[String])],
+    comment: Option[String],
+    properties: Map[String, String],
+    originalText: String,
+    allowExisting: Boolean,
+    replace: Boolean) extends UnaryCommand {
+  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan = {
+    copy(child = newChild)
+  }
+}
 
 case class MetricViewPlaceholder(
     metadata: CatalogTable,
-    desc: MetricView,
+    inputColumns: Seq[InputColumn],
     outputMetrics: Seq[Attribute],
     child: LogicalPlan,
     isCreate: Boolean = false) extends UnaryNode {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
index 2ac20ebeaa95..ed7713819c15 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/util/MetricViewPlanner.scala
@@ -24,24 +24,26 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.metricview.logical.MetricViewPlaceholder
-import org.apache.spark.sql.metricview.serde.{AssetSource, MetricView, 
MetricViewFactory, MetricViewValidationException, 
MetricViewYAMLParsingException, SQLSource}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.metricview.logical.{DimensionInputColumn, 
InputColumn, MeasureInputColumn, MetricViewPlaceholder}
+import org.apache.spark.sql.metricview.serde.{AssetSource, 
DimensionExpression, JsonUtils, MeasureExpression, MetricView, 
MetricViewFactory, MetricViewValidationException, 
MetricViewYAMLParsingException, SQLSource}
+import org.apache.spark.sql.types.{Metadata, StructType}
 
 object MetricViewPlanner {
 
   def planWrite(
       metadata: CatalogTable,
       yaml: String,
-      sqlParser: ParserInterface): MetricViewPlaceholder = {
+      sqlParser: ParserInterface): (MetricViewPlaceholder, MetricView) = {
     val (metricView, dataModelPlan) = parseYAML(yaml, sqlParser)
-    MetricViewPlaceholder(
+    val inputColumns = buildInputColumns(metricView, sqlParser)
+    val placeholder = MetricViewPlaceholder(
       metadata,
-      metricView,
+      inputColumns,
       Seq.empty,
       dataModelPlan,
       isCreate = true
     )
+    (placeholder, metricView)
   }
 
   def planRead(
@@ -50,14 +52,37 @@ object MetricViewPlanner {
       sqlParser: ParserInterface,
       expectedSchema: StructType): MetricViewPlaceholder = {
     val (metricView, dataModelPlan) = parseYAML(yaml, sqlParser)
+    val inputColumns = buildInputColumns(metricView, sqlParser)
     MetricViewPlaceholder(
       metadata,
-      metricView,
+      inputColumns,
       DataTypeUtils.toAttributes(expectedSchema),
       dataModelPlan
     )
   }
 
+  /**
+   * Parses every column's `MeasureExpression` / `DimensionExpression` from 
the YAML descriptor
+   * into a typed [[InputColumn]] (with the SQL expression already parsed) so 
downstream
+   * resolution rules read a stable representation rather than re-parsing the 
YAML.
+   * Column metadata is converted once here from the canonical 
`ColumnMetadata` to Spark's
+   * `Metadata`, preserving the per-column annotations (e.g. dimension / 
measure type marker,
+   * source expression text) the resolver attaches to output attributes.
+   */
+  private def buildInputColumns(
+      metricView: MetricView,
+      sqlParser: ParserInterface): Seq[InputColumn] = {
+    metricView.select.map { col =>
+      val md = Metadata.fromJson(JsonUtils.toJson(col.getColumnMetadata))
+      col.expression match {
+        case DimensionExpression(expr) =>
+          DimensionInputColumn(col.name, sqlParser.parseExpression(expr), md)
+        case MeasureExpression(expr) =>
+          MeasureInputColumn(col.name, sqlParser.parseExpression(expr), md)
+      }
+    }
+  }
+
   private def parseYAML(
       yaml: String,
       sqlParser: ParserInterface): (MetricView, LogicalPlan) = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
index a7763c2799a6..760a3d09f467 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMetricView.scala
@@ -23,13 +23,11 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions._
 import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, 
Measure}
-import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.METRIC_VIEW_PLACEHOLDER
-import org.apache.spark.sql.metricview.logical.{MetricViewPlaceholder, 
ResolvedMetricView}
-import org.apache.spark.sql.metricview.serde.{Column => CanonicalColumn, 
DimensionExpression, JsonUtils, MeasureExpression, MetricView => 
CanonicalMetricView}
-import org.apache.spark.sql.types.{DataType, Metadata, MetadataBuilder}
+import org.apache.spark.sql.metricview.logical.{DimensionInputColumn, 
InputColumn, MeasureInputColumn, MetricViewPlaceholder, ResolvedMetricView}
+import org.apache.spark.sql.types.DataType
 
 /**
  * Analysis rule for resolving metric view operations (CREATE and SELECT).
@@ -165,7 +163,6 @@ import org.apache.spark.sql.types.{DataType, Metadata, 
MetadataBuilder}
  * into resolved logical plans that can be further optimized and executed.
  */
 case class ResolveMetricView(session: SparkSession) extends Rule[LogicalPlan] {
-  private def parser: ParserInterface = session.sessionState.sqlParser
   override def apply(plan: LogicalPlan): LogicalPlan = {
     if (!plan.containsPattern(METRIC_VIEW_PLACEHOLDER)) {
       return plan
@@ -176,7 +173,7 @@ case class ResolveMetricView(session: SparkSession) extends 
Rule[LogicalPlan] {
       // are aggregate functions, we need to use an Aggregate node and group 
by all
       // dimensions to get the output schema.
       case mvp: MetricViewPlaceholder if mvp.isCreate && mvp.child.resolved =>
-        val (dimensions, measures) = buildMetricViewOutput(mvp.desc)
+        val (dimensions, measures) = buildMetricViewOutput(mvp.inputColumns)
         Aggregate(
           // group by all dimensions
           dimensions.map(_.toAttribute).toSeq,
@@ -190,9 +187,11 @@ case class ResolveMetricView(session: SparkSession) 
extends Rule[LogicalPlan] {
       // Resolve the Aggregate node based on the metric view output and then 
replace
       // the AttributeReference of the metric view output to the actual 
expressions.
       case node @ MetricViewReadOperation(metricView) =>
-        // step 1: parse the metric view definition
+        // step 1: build typed dimension / measure columns from the 
placeholder's
+        // pre-parsed `inputColumns`. These were populated by 
`MetricViewPlanner` from the
+        // YAML descriptor so the resolver doesn't need to re-parse 
expressions here.
         val (dimensions, measures) =
-          parseMetricViewColumns(metricView.outputMetrics, 
metricView.desc.select)
+          buildMetricViewColumns(metricView.outputMetrics, 
metricView.inputColumns)
 
         // step 2: build the Project node containing the dimensions
         val dimensionExprs = dimensions.map(_.namedExpr)
@@ -235,50 +234,40 @@ case class ResolveMetricView(session: SparkSession) 
extends Rule[LogicalPlan] {
     }
   }
 
-  private def buildMetricViewOutput(metricView: CanonicalMetricView)
+  /**
+   * Builds the named expressions used by the CREATE-time aggregate (so the 
analyzer can
+   * derive the output schema). Reads pre-parsed expressions from 
[[InputColumn]]s.
+   */
+  private def buildMetricViewOutput(inputColumns: Seq[InputColumn])
   : (Seq[NamedExpression], Seq[NamedExpression]) = {
     val dimensions = new mutable.ArrayBuffer[NamedExpression]()
     val measures = new mutable.ArrayBuffer[NamedExpression]()
-    metricView.select.foreach { col =>
-      val metadata = new MetadataBuilder()
-        
.withMetadata(Metadata.fromJson(JsonUtils.toJson(col.getColumnMetadata)))
-        .build()
-      col.expression match {
-        case DimensionExpression(expr) =>
-          dimensions.append(
-            Alias(parser.parseExpression(expr), col.name)(explicitMetadata = 
Some(metadata)))
-        case MeasureExpression(expr) =>
-          measures.append(
-            Alias(parser.parseExpression(expr), col.name)(explicitMetadata = 
Some(metadata)))
-      }
+    inputColumns.foreach {
+      case c: DimensionInputColumn =>
+        dimensions.append(Alias(c.expr, c.name)(explicitMetadata = 
Some(c.metadata)))
+      case c: MeasureInputColumn =>
+        measures.append(Alias(c.expr, c.name)(explicitMetadata = 
Some(c.metadata)))
     }
     (dimensions.toSeq, measures.toSeq)
   }
 
-  private def parseMetricViewColumns(
+  /**
+   * Pairs each pre-parsed [[InputColumn]] with the matching output attribute 
(by position)
+   * so the resolver can replace `MEASURE(<measure-name>)` references with the 
original
+   * aggregate expression while preserving exprId and data type.
+   */
+  private def buildMetricViewColumns(
       metricViewOutput: Seq[Attribute],
-      columns: Seq[CanonicalColumn]
+      inputColumns: Seq[InputColumn]
   ): (Seq[MetricViewDimension], Seq[MetricViewMeasure]) = {
     val dimensions = new mutable.ArrayBuffer[MetricViewDimension]()
     val measures = new mutable.ArrayBuffer[MetricViewMeasure]()
-    metricViewOutput.zip(columns).foreach { case (attr, column) =>
-      column.expression match {
-        case DimensionExpression(expr) =>
-          dimensions.append(
-            MetricViewDimension(
-              attr.name,
-              parser.parseExpression(expr),
-              attr.exprId,
-              attr.dataType)
-          )
-        case MeasureExpression(expr) =>
-          measures.append(
-            MetricViewMeasure(
-              attr.name,
-              parser.parseExpression(expr),
-              attr.exprId,
-              attr.dataType)
-          )
+    metricViewOutput.zip(inputColumns).foreach { case (attr, column) =>
+      column match {
+        case c: DimensionInputColumn =>
+          dimensions.append(MetricViewDimension(attr.name, c.expr, 
attr.exprId, attr.dataType))
+        case c: MeasureInputColumn =>
+          measures.append(MetricViewMeasure(attr.name, c.expr, attr.exprId, 
attr.dataType))
       }
     }
     (dimensions.toSeq, measures.toSeq)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 8f0b664e10c5..8774feb4d911 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -36,6 +36,7 @@ import 
org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1,
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
 import org.apache.spark.sql.internal.connector.V1Function
+import org.apache.spark.sql.metricview.logical.CreateMetricView
 import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType, 
StructField, StructType}
 import org.apache.spark.util.SparkStringUtils
 
@@ -577,6 +578,20 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
         viewType = PersistedView,
         viewSchemaMode = viewSchemaMode)
 
+    // CREATE VIEW ... WITH METRICS on the session catalog -> V1 runnable 
command. Non-session
+    // v2 catalogs leave [[CreateMetricView]] in place for 
`DataSourceV2Strategy` to dispatch
+    // to `CreateV2MetricViewExec`.
+    case cm @ CreateMetricView(ResolvedIdentifier(catalog, _), _, _, _, _, _, 
_)
+        if isSessionCatalog(catalog) =>
+      CreateMetricViewCommand(
+        cm.child,
+        cm.userSpecifiedColumns,
+        cm.comment,
+        cm.properties,
+        cm.originalText,
+        cm.allowExisting,
+        cm.replace)
+
     // ViewCatalog catalogs are handled by the v2 strategy (enumerates via 
listViews); we skip
     // the match here so the plan flows through unchanged. Only non-session, 
non-ViewCatalog
     // catalogs hit the MISSING_CATALOG_ABILITY.VIEWS rejection.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index b4ece7329094..0ecbf7609d2b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.sql.metricview.logical.CreateMetricView
 import org.apache.spark.sql.types.{DataType, StringType}
 import org.apache.spark.util.Utils.getUriBuilder
 
@@ -863,7 +864,7 @@ class SparkSqlAstBuilder extends AstBuilder {
       .getOrElse(Map.empty)
     val codeLiteral = visitCodeLiteral(ctx.codeLiteral())
 
-    CreateMetricViewCommand(
+    CreateMetricView(
       withIdentClause(ctx.identifierReference(), UnresolvedIdentifier(_)),
       userSpecifiedColumns,
       visitCommentSpecList(ctx.commentSpec()),
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala
index 5937ad2300cd..3ae6dcad174f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/metricViewCommands.scala
@@ -149,9 +149,8 @@ object MetricViewHelper {
   /**
    * Analyzes a metric-view YAML body so the create / alter path can capture 
the source plan
    * and its dependencies. Returns the analyzed plan together with the parsed 
[[MetricView]]
-   * descriptor (the latter is grabbed off the un-analyzed 
[[MetricViewPlaceholder]] before
-   * the analyzer rewrites it away, so callers needing the descriptor for 
property emission
-   * don't have to re-parse the YAML).
+   * descriptor (returned alongside the placeholder by 
[[MetricViewPlanner.planWrite]] so
+   * callers needing the descriptor for property emission don't have to 
re-parse the YAML).
    *
    * `nameParts` is the multi-part target identifier (catalog + namespace + 
table). The synthetic
    * [[CatalogTable]] used as analysis context still carries a 
[[TableIdentifier]] (capped at
@@ -189,12 +188,11 @@ object MetricViewHelper {
       schema = new StructType(),
       viewOriginalText = Some(viewText),
       viewText = Some(viewText))
-    val placeholder = MetricViewPlanner.planWrite(
+    // `planWrite` returns the placeholder (carrying pre-parsed 
`inputColumns`) and the
+    // parsed YAML descriptor separately, so the caller can read the 
descriptor for
+    // property emission (e.g. `metric_view.*` keys) without keeping it on the 
placeholder.
+    val (placeholder, metricView) = MetricViewPlanner.planWrite(
       tableMeta, viewText, session.sessionState.sqlParser)
-    // Grab the parsed descriptor BEFORE analysis -- the placeholder gets 
replaced by
-    // ResolvedMetricView during analyzer rules, after which `MetricView` is 
no longer
-    // recoverable from the plan tree.
-    val metricView = placeholder.desc
     val analyzed = analyzer.executeAndCheck(placeholder, new 
QueryPlanningTracker)
     ViewHelper.verifyTemporaryObjectsNotExists(
       isTemporary = false, nameParts, analyzed, Seq.empty)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2MetricViewExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2MetricViewExec.scala
index d25239aac949..fb27e9feaa0b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2MetricViewExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2MetricViewExec.scala
@@ -30,8 +30,8 @@ import 
org.apache.spark.sql.connector.catalog.{DependencyList, Identifier, Table
  * METRIC_VIEW`) via the [[V2ViewPreparation]] hooks.
  *
  * Routed by [[DataSourceV2Strategy]] from
- * [[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] when the 
resolved
- * catalog is a non-session v2 catalog.
+ * [[org.apache.spark.sql.metricview.logical.CreateMetricView]] when the 
resolved catalog
+ * is a non-session v2 catalog.
  */
 case class CreateV2MetricViewExec(
     catalog: ViewCatalog,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index ed067a3f00d1..4fd7d993cc3d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -44,11 +44,12 @@ import 
org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBat
 import org.apache.spark.sql.connector.write.{V1Write, Write}
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.execution.{FilterExec, InSubqueryExec, 
LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, 
ScalarSubquery => ExecScalarSubquery, SparkPlan, SparkStrategy => Strategy}
-import org.apache.spark.sql.execution.command.{CommandUtils, 
CreateMetricViewCommand, MetricViewHelper}
+import org.apache.spark.sql.execution.command.{CommandUtils, MetricViewHelper}
 import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
LogicalRelationWithTable, PushableColumnAndNestedColumn}
 import 
org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource,
 WriteToContinuousDataSourceExec}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
+import org.apache.spark.sql.metricview.logical.CreateMetricView
 import org.apache.spark.sql.sources.{BaseRelation, TableScan}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.ArrayImplicits._
@@ -326,8 +327,10 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
     // CREATE VIEW ... WITH METRICS on a non-session v2 catalog. Routes the 
metric-view path
     // through `CreateV2MetricViewExec`, which extends `V2ViewPreparation` to 
share the
     // `IF NOT EXISTS` short-circuit, `OR REPLACE`, and cross-type-collision 
decoding with
-    // `CreateV2ViewExec`. Session-catalog dispatch stays in 
`CreateMetricViewCommand.run`.
-    case CreateMetricViewCommand(
+    // `CreateV2ViewExec`. Session-catalog dispatch happens earlier in 
`ResolveSessionCatalog`,
+    // which rewrites `CreateMetricView` (the parser's v1/v2-agnostic logical 
plan) to
+    // `CreateMetricViewCommand` for v1 execution.
+    case CreateMetricView(
         ResolvedIdentifier(catalog, ident), userSpecifiedColumns, comment, 
properties,
         originalText, allowExisting, replace) if 
!CatalogV2Util.isSessionCatalog(catalog) =>
       val viewCatalog = catalog match {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala
index a90adb8fee61..b1f011c9cdac 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/MetricViewV2CatalogSuite.scala
@@ -29,8 +29,11 @@ import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.Metadata
 
 /**
- * Tests that exercise 
[[org.apache.spark.sql.execution.command.CreateMetricViewCommand]] on a
- * non-session V2 catalog. Metric views are persisted through the same 
[[ViewCatalog]] interface
+ * Tests that exercise 
[[org.apache.spark.sql.metricview.logical.CreateMetricView]] on a
+ * non-session V2 catalog (routed through
+ * [[org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy]] to
+ * [[org.apache.spark.sql.execution.datasources.v2.CreateV2MetricViewExec]]).
+ * Metric views are persisted through the same [[ViewCatalog]] interface
  * as plain views; the only marker that distinguishes them is `PROP_TABLE_TYPE 
= METRIC_VIEW`
  * plus the typed `viewDependencies` field on [[ViewInfo]]. The recording 
catalog used here is a
  * minimal [[TableViewCatalog]] so the same instance can also host the source 
table referenced by


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to