This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dc47def56265 [SPARK-54403][SQL][METRIC VIEW] Add YAML serde 
infrastructure for metric views
dc47def56265 is described below

commit dc47def562652b6d35a6ecb6600373ed645326bf
Author: Linhong Liu <[email protected]>
AuthorDate: Wed Dec 10 11:31:06 2025 +0800

    [SPARK-54403][SQL][METRIC VIEW] Add YAML serde infrastructure for metric 
views
    
    ### What changes were proposed in this pull request?
    This PR adds the complete serialization/deserialization infrastructure for 
parsing metric view YAML definitions:
    - Add Jackson YAML dependencies to pom.xml
    - Implement canonical model for metric views:
      - Column, Expression (Dimension/Measure), MetricView, Source
      - YAMLVersion validation and exception types
    - Implement version-specific serde (v0.1):
      - YAML deserializer/serializer
      - Base classes for extensibility
    - Add JSON utilities for metadata serialization
    
    ### Why are the changes needed?
    [SPIP: Metrics & semantic modeling in 
Spark](https://docs.google.com/document/d/1xVTLijvDTJ90lZ_ujwzf9HvBJgWg0mY6cYM44Fcghl0/edit?tab=t.0#heading=h.4iogryr5qznc)
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    ```
    build/sbt "catalyst/testOnly 
org.apache.spark.sql.metricview.serde.MetricViewFactorySuite"
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Generated-by: [Claude Code](https://claude.com/claude-code)
    Co-Authored-By: Claude <noreplyanthropic.com>
    
    Closes #53146 from linhongliu-db/metric-view-serde.
    
    Authored-by: Linhong Liu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 sql/catalyst/pom.xml                               |   4 +
 .../spark/sql/metricview/serde/JsonUtils.scala     |  34 ++
 .../sql/metricview/serde/MetricViewCanonical.scala | 170 ++++++++++
 .../sql/metricview/serde/MetricViewFactory.scala   |  69 ++++
 .../sql/metricview/serde/MetricViewSerDeBase.scala | 189 +++++++++++
 .../sql/metricview/serde/MetricViewSerDeV01.scala  |  83 +++++
 .../metricview/serde/MetricViewFactorySuite.scala  | 373 +++++++++++++++++++++
 7 files changed, 922 insertions(+)

diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 67ee98bd9430..9b7fb89ddd9e 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -130,6 +130,10 @@
       <artifactId>univocity-parsers</artifactId>
       <type>jar</type>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.ws.xmlschema</groupId>
       <artifactId>xmlschema-core</artifactId>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/JsonUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/JsonUtils.scala
new file mode 100644
index 000000000000..17a4dac72b7c
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/JsonUtils.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.serde
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+private[sql] object JsonUtils {
+  // Singleton ObjectMapper that can be used across the project
+  private lazy val mapper: ObjectMapper = {
+    val m = new ObjectMapper()
+    m.registerModule(DefaultScalaModule)
+    m
+  }
+
+  def toJson[T: Manifest](obj: T): String = {
+    mapper.writeValueAsString(obj)
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewCanonical.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewCanonical.scala
new file mode 100644
index 000000000000..2e76a13741d0
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewCanonical.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.serde
+
+import scala.util.{Failure, Success, Try}
+
+import com.fasterxml.jackson.annotation.{JsonIgnoreProperties, JsonInclude, 
JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.metricview.serde.ColumnType.ColumnType
+import org.apache.spark.sql.metricview.serde.SourceType.SourceType
+
+private[sql] sealed abstract class MetricViewSerdeException(
+    message: String,
+    cause: Option[Throwable] = None)
+  extends Exception(message, cause.orNull)
+
+private[sql] case class MetricViewValidationException(
+    message: String,
+    cause: Option[Throwable] = None)
+  extends MetricViewSerdeException(message, cause)
+
+private[sql] case class MetricViewFromProtoException(
+    message: String,
+    cause: Option[Throwable] = None)
+  extends MetricViewSerdeException(message, cause)
+
+private[sql] case class MetricViewYAMLParsingException(
+    message: String,
+    cause: Option[Throwable] = None)
+  extends MetricViewSerdeException(message, cause)
+
+// Expression types in a Metric View
+private[sql] sealed trait Expression {
+  def expr: String
+}
+
+// Dimension expression representing a scalar value
+private[sql] case class DimensionExpression(expr: String) extends Expression
+
+// Measure expression representing an aggregated value
+private[sql] case class MeasureExpression(expr: String) extends Expression
+
+private[sql] object SourceType extends Enumeration {
+  type SourceType = Value
+  val ASSET, SQL = Value
+
+  def fromString(sourceType: String): SourceType = {
+    values.find(_.toString.equalsIgnoreCase(sourceType)).getOrElse {
+      throw MetricViewFromProtoException(
+        s"Unsupported source type: $sourceType"
+      )
+    }
+  }
+}
+
+// Representation of a source in the Metric View
+private[sql] sealed trait Source {
+  def sourceType: SourceType
+}
+
+// Asset source, representing a catalog table, view, or Metric View, etc.
+private[sql] case class AssetSource(name: String) extends Source {
+  val sourceType: SourceType = SourceType.ASSET
+
+  override def toString: String = this.name
+}
+
+// SQL source, representing a SQL query
+private[sql] case class SQLSource(sql: String) extends Source {
+  val sourceType: SourceType = SourceType.SQL
+
+  override def toString: String = this.sql
+}
+
+private[sql] object Source {
+  def apply(sourceText: String): Source = {
+    if (sourceText.isEmpty) {
+      throw MetricViewValidationException("Source cannot be empty")
+    }
+    Try(CatalystSqlParser.parseTableIdentifier(sourceText)) match {
+      case Success(_) => AssetSource(sourceText)
+      case Failure(_) =>
+        Try(CatalystSqlParser.parseQuery(sourceText)) match {
+          case Success(_) => SQLSource(sourceText)
+          case Failure(queryEx) =>
+            throw MetricViewValidationException(
+              s"Invalid source: $sourceText",
+              Some(queryEx)
+            )
+        }
+    }
+  }
+}
+
+private[sql] case class Column(
+    name: String,
+    expression: Expression,
+    ordinal: Int) {
+  def columnType: ColumnType = expression match {
+    case _: DimensionExpression => ColumnType.Dimension
+    case _: MeasureExpression => ColumnType.Measure
+    case _ =>
+      throw MetricViewValidationException(
+        s"Unsupported expression type: ${expression.getClass.getName}"
+      )
+  }
+
+  def getColumnMetadata: ColumnMetadata = {
+    val expr = expression.expr
+    if (expr.length > Constants.MAXIMUM_PROPERTY_SIZE) {
+      throw MetricViewValidationException(
+        s"Expression length ${expr.length} exceeds maximum allowed size " +
+        s"${Constants.MAXIMUM_PROPERTY_SIZE} for column '$name'"
+      )
+    }
+    ColumnMetadata(columnType.toString, expr)
+  }
+}
+
+private[sql] object ColumnType extends Enumeration {
+  type ColumnType = Value
+  val Dimension: ColumnType = Value("dimension")
+  val Measure: ColumnType = Value("measure")
+
+  // Method to match case-insensitively and return the correct value
+  def fromString(columnType: String): ColumnType = {
+    values.find(_.toString.equalsIgnoreCase(columnType)).getOrElse {
+      throw MetricViewFromProtoException(
+        s"Unsupported column type: $columnType"
+      )
+    }
+  }
+}
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(Include.NON_ABSENT)
+private[sql] case class ColumnMetadata(
+    @JsonProperty(value = Constants.COLUMN_TYPE_PROPERTY_KEY, required = true)
+    columnType: String, // "type" -> "metric_view.type"
+    @JsonProperty(value = Constants.COLUMN_EXPR_PROPERTY_KEY, required = true)
+    expr: String // "expr" -> "metric_view.expr"
+)
+
+// Only parse the "version" field and ignore all others
+@JsonIgnoreProperties(ignoreUnknown = true)
+private[sql] case class YAMLVersion(version: String) {
+}
+
+private[sql] case class MetricView(
+    version: String,
+    from: Source,
+    where: Option[String] = None,
+    select: Seq[Column])
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewFactory.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewFactory.scala
new file mode 100644
index 000000000000..ced4c7e635e3
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewFactory.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.serde
+
+import scala.util.control.NonFatal
+
+private[sql] object MetricViewFactory {
+  def fromYAML(yamlContent: String): MetricView = {
+    try {
+      val yamlVersion =
+        YamlMapperProviderV01.mapperWithAllFields.readValue(yamlContent, 
classOf[YAMLVersion])
+      yamlVersion.version match {
+        case "0.1" =>
+          MetricViewYAMLDeserializerV01.parseYaml(yamlContent).toCanonical
+        case _ =>
+          throw MetricViewValidationException(
+            s"Invalid YAML version: ${yamlVersion.version}"
+          )
+      }
+    } catch {
+      case e: MetricViewSerdeException =>
+        throw e
+      case NonFatal(e) =>
+        throw MetricViewYAMLParsingException(
+          s"Failed to parse YAML: ${e.getMessage}",
+          Some(e)
+        )
+    }
+  }
+
+  def toYAML(metricView: MetricView): String = {
+    try {
+      val versionSpecific = MetricViewBase.fromCanonical(metricView)
+      versionSpecific.version match {
+        case "0.1" =>
+          MetricViewYAMLSerializerV01.toYaml(
+            versionSpecific.asInstanceOf[MetricViewV01]
+          )
+        case _ =>
+          throw MetricViewValidationException(
+            s"Invalid YAML version: ${metricView.version}"
+          )
+      }
+    } catch {
+      case e: MetricViewSerdeException =>
+        throw e
+      case NonFatal(e) =>
+        throw MetricViewYAMLParsingException(
+          s"Failed to serialize to YAML: ${e.getMessage}",
+          Some(e)
+        )
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeBase.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeBase.scala
new file mode 100644
index 000000000000..137e34e53a9d
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeBase.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.serde
+
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.JsonInclude
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.{YAMLFactory, YAMLGenerator}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.yaml.snakeyaml.DumperOptions
+
+private[sql] object Constants {
+  final val MAXIMUM_PROPERTY_SIZE: Int = 1 * 1024
+  final val COLUMN_TYPE_PROPERTY_KEY = "metric_view.type"
+  final val COLUMN_EXPR_PROPERTY_KEY = "metric_view.expr"
+}
+
+private[sql] trait YamlMapperProviderBase {
+  def mapperWithAllFields: ObjectMapper = {
+    val options = new DumperOptions()
+    // Set flow style to BLOCK for better readability (each key-value pair on 
separate lines)
+    options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
+    // Set indentation to 2 spaces
+    options.setIndent(2)
+    // Set indicator indentation to 2 spaces for list/dict indicators
+    options.setIndicatorIndent(2)
+    // Enable indentation with indicators for better readability
+    options.setIndentWithIndicator(true)
+    // Disable pretty flow so that it doesn't add unnecessary newlines after 
dashes
+    options.setPrettyFlow(false)
+
+    val yamlFactory = YAMLFactory.builder()
+      // Minimize quotes around strings when possible
+      .configure(YAMLGenerator.Feature.MINIMIZE_QUOTES, true)
+      // Don't force numbers to be quoted as strings (preserve numeric types)
+      .configure(YAMLGenerator.Feature.ALWAYS_QUOTE_NUMBERS_AS_STRINGS, false)
+      // Don't write YAML document start marker (---)
+      .configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, false)
+      // Disable native type IDs and use explicit type instead
+      .configure(YAMLGenerator.Feature.USE_NATIVE_TYPE_ID, false)
+      .dumperOptions(options)
+      .build()
+
+    val mapper = new ObjectMapper(yamlFactory)
+      // Exclude null values from serialized output
+      .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL)
+      // Exclude empty collections/strings from serialized output
+      .setDefaultPropertyInclusion(JsonInclude.Include.NON_EMPTY)
+
+    mapper.registerModule(DefaultScalaModule)
+    mapper
+  }
+}
+
+/**
+ * Common YAML parsing logic shared by version-specific YAML parsers.
+ * This trait provides the core parsing functionality while allowing 
version-specific
+ * implementations to specify the target type and YAML configuration.
+ */
+private[sql] trait BaseMetricViewYAMLDeserializer[T] {
+  /**
+   * The YAML utilities to use for deserialization.
+   * Subclasses can override this to provide version-specific YAML behavior.
+   */
+  protected def yamlMapperProvider: YamlMapperProviderBase
+
+  /**
+   * Parse YAML content into the specified type.
+   * @param yamlContent The YAML content to parse
+   * @return The parsed MetricView of type T
+   */
+  def parseYaml(yamlContent: String): T = {
+    try {
+      yamlMapperProvider.mapperWithAllFields.readValue(yamlContent, 
getTargetClass)
+    } catch {
+      case NonFatal(e) =>
+        throw MetricViewYAMLParsingException(
+          s"Failed to parse YAML: ${e.getMessage}",
+          Some(e)
+        )
+    }
+  }
+  /**
+   * Get the target class for deserialization.
+   * This must be implemented by version-specific implementations.
+   * @return The Class object for the target type
+   */
+  protected def getTargetClass: Class[T]
+}
+
+private[sql] trait BaseMetricViewYAMLSerializer[T] {
+  protected def yamlMapperProvider: YamlMapperProviderBase
+
+  def toYaml(obj: T): String = {
+    try {
+      yamlMapperProvider.mapperWithAllFields.writeValueAsString(obj)
+    } catch {
+      case NonFatal(e) =>
+        throw MetricViewYAMLParsingException(
+          s"Failed to serialize to YAML: ${e.getMessage}",
+          Some(e)
+        )
+    }
+  }
+}
+
+private[sql] trait ColumnBase {
+  def name: String
+  def expr: String
+  def toCanonical(ordinal: Int, isDimension: Boolean): Column = {
+    if (isDimension) {
+      Column(
+        name = name,
+        expression = DimensionExpression(expr),
+        ordinal = ordinal
+      )
+    } else {
+      Column(
+        name = name,
+        expression = MeasureExpression(expr),
+        ordinal = ordinal
+      )
+    }
+  }
+}
+
+private[sql] trait MetricViewBase {
+  def version: String
+  def source: String
+  def filter: Option[String]
+  def dimensions: Seq[ColumnBase]
+  def measures: Seq[ColumnBase]
+
+  // Different YAML versions could have differnt syntax to describe a 
MetricView, but
+  // all of them should be able to be converted to the same canonical form.
+  // For example, in later versions, we may change the syntax of source to 
support
+  // multiple sources (e.g. joins, compared to the current single source). But 
the
+  // canonical form should be able to represent both syntaxes.
+  def toCanonical: MetricView = {
+    // Convert dimensions with proper ordinals (0 to dimensions.length-1)
+    val dimensionsCanonical = dimensions.zipWithIndex.map {
+      case (column, index) => column.toCanonical(index, isDimension = true)
+    }
+    // Convert measures with proper ordinals
+    // (dimensions.length to dimensions.length + measures.length - 1)
+    val measuresCanonical = measures.zipWithIndex.map {
+      case (column, index) =>
+        column.toCanonical(dimensions.length + index, isDimension = false)
+    }
+    MetricView(
+      version = version,
+      from = Source(source),
+      where = filter,
+      select = dimensionsCanonical ++ measuresCanonical
+    )
+  }
+}
+
+private[sql] object MetricViewBase {
+  /**
+   * Factory method to create the appropriate version-specific MetricView from 
canonical form.
+   * @param canonical The canonical MetricView to convert from
+   * @return The appropriate version-specific MetricView
+   */
+  def fromCanonical(canonical: MetricView): MetricViewBase = {
+    canonical.version match {
+      case "0.1" =>
+        MetricViewV01.fromCanonical(canonical)
+      case _ =>
+        throw new IllegalArgumentException(s"Unsupported version: 
${canonical.version}")
+    }
+  }
+}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeV01.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeV01.scala
new file mode 100644
index 000000000000..ee914785b612
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeV01.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.serde
+
+import com.fasterxml.jackson.annotation.JsonProperty
+
+private[sql] case class ColumnV01(
+    @JsonProperty(required = true) name: String,
+    @JsonProperty(required = true) expr: String
+) extends ColumnBase
+
+private[sql] object ColumnV01 {
+  def fromCanonical(canonical: Column): ColumnV01 = {
+    val name = canonical.name
+    val expr = canonical.expression match {
+      case DimensionExpression(exprStr) => exprStr
+      case MeasureExpression(exprStr) => exprStr
+      case _ =>
+        throw new IllegalArgumentException(
+          s"Unsupported expression type: 
${canonical.expression.getClass.getName}")
+    }
+    ColumnV01(name = name, expr = expr)
+  }
+}
+
+private[sql] case class MetricViewV01(
+    @JsonProperty(required = true) version: String,
+    @JsonProperty(required = true) source: String,
+    filter: Option[String] = None,
+    dimensions: Seq[ColumnV01] = Seq.empty,
+    measures: Seq[ColumnV01] = Seq.empty) extends MetricViewBase
+
+private[sql] object MetricViewV01 {
+  def fromCanonical(canonical: MetricView): MetricViewV01 = {
+    val source = canonical.from.toString
+    val filter = canonical.where
+    // Separate dimensions and measures based on expression type
+    val dimensions = canonical.select.collect {
+      case column if column.expression.isInstanceOf[DimensionExpression] =>
+        ColumnV01.fromCanonical(column)
+    }
+    val measures = canonical.select.collect {
+      case column if column.expression.isInstanceOf[MeasureExpression] =>
+        ColumnV01.fromCanonical(column)
+    }
+    MetricViewV01(
+      version = canonical.version,
+      source = source,
+      filter = filter,
+      dimensions = dimensions,
+      measures = measures
+    )
+  }
+}
+
+private[sql] object YamlMapperProviderV01 extends YamlMapperProviderBase
+
+private[sql] object MetricViewYAMLDeserializerV01
+  extends BaseMetricViewYAMLDeserializer[MetricViewV01] {
+  override protected def yamlMapperProvider: YamlMapperProviderBase = 
YamlMapperProviderV01
+
+  protected def getTargetClass: Class[MetricViewV01] = classOf[MetricViewV01]
+}
+
+private[sql] object MetricViewYAMLSerializerV01
+  extends BaseMetricViewYAMLSerializer[MetricViewV01] {
+  override protected def yamlMapperProvider: YamlMapperProviderBase = 
YamlMapperProviderV01
+}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/metricview/serde/MetricViewFactorySuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/metricview/serde/MetricViewFactorySuite.scala
new file mode 100644
index 000000000000..77d2b1cfbf3e
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/metricview/serde/MetricViewFactorySuite.scala
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.serde
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Test suite for [[MetricViewFactory]] YAML serialization and deserialization.
+ */
+class MetricViewFactorySuite extends SparkFunSuite {
+
+  test("fromYAML - parse basic metric view with asset source") {
+    val yaml =
+      """version: "0.1"
+        |source: my_table
+        |dimensions:
+        |  - name: customer_id
+        |    expr: customer_id
+        |measures:
+        |  - name: total_revenue
+        |    expr: SUM(revenue)
+        |""".stripMargin
+
+    val metricView = MetricViewFactory.fromYAML(yaml)
+
+    assert(metricView.version === "0.1")
+    assert(metricView.from.isInstanceOf[AssetSource])
+    assert(metricView.from.asInstanceOf[AssetSource].name === "my_table")
+    assert(metricView.where.isEmpty)
+    assert(metricView.select.length === 2)
+
+    val customerIdCol = metricView.select(0)
+    assert(customerIdCol.name === "customer_id")
+    assert(customerIdCol.expression.isInstanceOf[DimensionExpression])
+    assert(customerIdCol.expression.expr === "customer_id")
+
+    val revenueCol = metricView.select(1)
+    assert(revenueCol.name === "total_revenue")
+    assert(revenueCol.expression.isInstanceOf[MeasureExpression])
+    assert(revenueCol.expression.expr === "SUM(revenue)")
+  }
+
+  test("fromYAML - parse metric view with SQL source") {
+    val yaml =
+      """version: "0.1"
+        |source: SELECT * FROM my_table WHERE year = 2024
+        |dimensions:
+        |  - name: product_id
+        |    expr: product_id
+        |measures:
+        |  - name: quantity_sold
+        |    expr: SUM(quantity)
+        |""".stripMargin
+
+    val metricView = MetricViewFactory.fromYAML(yaml)
+
+    assert(metricView.from.isInstanceOf[SQLSource])
+    assert(metricView.from.asInstanceOf[SQLSource].sql ===
+      "SELECT * FROM my_table WHERE year = 2024")
+    assert(metricView.select.length === 2)
+  }
+
+  test("fromYAML - parse metric view with filter clause") {
+    val yaml =
+      """version: "0.1"
+        |source: sales_data
+        |filter: year >= 2020 AND status = 'completed'
+        |dimensions:
+        |  - name: region
+        |    expr: region
+        |measures:
+        |  - name: sales
+        |    expr: SUM(amount)
+        |""".stripMargin
+
+    val metricView = MetricViewFactory.fromYAML(yaml)
+
+    assert(metricView.where.isDefined)
+    assert(metricView.where.get === "year >= 2020 AND status = 'completed'")
+  }
+
+  test("fromYAML - parse metric view with multiple dimensions and measures") {
+    val yaml =
+      """version: "0.1"
+        |source: transactions
+        |dimensions:
+        |  - name: customer_id
+        |    expr: customer_id
+        |  - name: product_id
+        |    expr: product_id
+        |  - name: region
+        |    expr: region
+        |measures:
+        |  - name: total_revenue
+        |    expr: SUM(revenue)
+        |  - name: avg_revenue
+        |    expr: AVG(revenue)
+        |  - name: transaction_count
+        |    expr: COUNT(*)
+        |""".stripMargin
+
+    val metricView = MetricViewFactory.fromYAML(yaml)
+
+    assert(metricView.select.length === 6)
+
+    // Check dimensions
+    assert(metricView.select(0).expression.isInstanceOf[DimensionExpression])
+    assert(metricView.select(1).expression.isInstanceOf[DimensionExpression])
+    assert(metricView.select(2).expression.isInstanceOf[DimensionExpression])
+
+    // Check measures
+    assert(metricView.select(3).expression.isInstanceOf[MeasureExpression])
+    assert(metricView.select(4).expression.isInstanceOf[MeasureExpression])
+    assert(metricView.select(5).expression.isInstanceOf[MeasureExpression])
+  }
+
+  test("fromYAML - invalid YAML version") {
+    val yaml =
+      """version: "99.9"
+        |source: my_table
+        |dimensions:
+        |  - name: id
+        |    expr: id
+        |""".stripMargin
+
+    val exception = intercept[MetricViewValidationException] {
+      MetricViewFactory.fromYAML(yaml)
+    }
+    assert(exception.getMessage.contains("Invalid YAML version: 99.9"))
+  }
+
+  test("fromYAML - malformed YAML") {
+    val yaml = """this is not valid yaml: [unclosed bracket"""
+
+    val exception = intercept[MetricViewYAMLParsingException] {
+      MetricViewFactory.fromYAML(yaml)
+    }
+    assert(exception.getMessage.contains("Failed to parse YAML"))
+  }
+
+  test("toYAML - serialize basic metric view") {
+    val metricView = MetricView(
+      version = "0.1",
+      from = AssetSource("my_table"),
+      where = None,
+      select = Seq(
+        Column("customer_id", DimensionExpression("customer_id"), 0),
+        Column("total_revenue", MeasureExpression("SUM(revenue)"), 1)
+      )
+    )
+
+    val yaml = MetricViewFactory.toYAML(metricView)
+
+    assert(yaml.contains("version: 0.1") || yaml.contains("version: \"0.1\""))
+    assert(yaml.contains("source: my_table"))
+    assert(yaml.contains("customer_id"))
+    assert(yaml.contains("total_revenue"))
+
+    // Verify it can be parsed back
+    val reparsed = MetricViewFactory.fromYAML(yaml)
+    assert(reparsed.version === "0.1")
+    assert(reparsed.from.asInstanceOf[AssetSource].name === "my_table")
+  }
+
+  test("toYAML - serialize metric view with SQL source") {
+    val metricView = MetricView(
+      version = "0.1",
+      from = SQLSource("SELECT * FROM table WHERE id > 100"),
+      where = None,
+      select = Seq(
+        Column("id", DimensionExpression("id"), 0)
+      )
+    )
+
+    val yaml = MetricViewFactory.toYAML(metricView)
+
+    assert(yaml.contains("source: SELECT * FROM table WHERE id > 100"))
+  }
+
+  test("toYAML - serialize metric view with filter clause") {
+    val metricView = MetricView(
+      version = "0.1",
+      from = AssetSource("sales"),
+      where = Some("year >= 2020"),
+      select = Seq(
+        Column("region", DimensionExpression("region"), 0),
+        Column("sales", MeasureExpression("SUM(amount)"), 1)
+      )
+    )
+
+    val yaml = MetricViewFactory.toYAML(metricView)
+
+    assert(yaml.contains("year >= 2020"))
+
+    // Verify it can be parsed back
+    val reparsed = MetricViewFactory.fromYAML(yaml)
+    assert(reparsed.where.isDefined)
+    assert(reparsed.where.get === "year >= 2020")
+  }
+
+  test("roundtrip - fromYAML and toYAML preserve data") {
+    val originalYaml =
+      """version: "0.1"
+        |source: sales_table
+        |filter: status = 'active'
+        |dimensions:
+        |  - name: customer_id
+        |    expr: customer_id
+        |  - name: product_name
+        |    expr: product_name
+        |measures:
+        |  - name: total_revenue
+        |    expr: SUM(revenue)
+        |  - name: order_count
+        |    expr: COUNT(order_id)
+        |""".stripMargin
+
+    // Parse the YAML
+    val metricView = MetricViewFactory.fromYAML(originalYaml)
+
+    // Serialize it back
+    val serializedYaml = MetricViewFactory.toYAML(metricView)
+
+    // Parse again to verify
+    val reparsedMetricView = MetricViewFactory.fromYAML(serializedYaml)
+
+    // Verify all fields match
+    assert(reparsedMetricView.version === metricView.version)
+    assert(reparsedMetricView.from === metricView.from)
+    assert(reparsedMetricView.where === metricView.where)
+    assert(reparsedMetricView.select.length === metricView.select.length)
+
+    reparsedMetricView.select.zip(metricView.select).foreach { case (col1, 
col2) =>
+      assert(col1.name === col2.name)
+      assert(col1.expression.expr === col2.expression.expr)
+      assert(col1.expression.getClass === col2.expression.getClass)
+    }
+  }
+
+  test("roundtrip - SQL source preservation") {
+    val originalYaml =
+      """version: "0.1"
+        |source: SELECT * FROM my_table WHERE year = 2024
+        |dimensions:
+        |  - name: id
+        |    expr: id
+        |measures:
+        |  - name: total
+        |    expr: SUM(value)
+        |""".stripMargin
+
+    val metricView = MetricViewFactory.fromYAML(originalYaml)
+    val serializedYaml = MetricViewFactory.toYAML(metricView)
+    val reparsedMetricView = MetricViewFactory.fromYAML(serializedYaml)
+
+    assert(reparsedMetricView.from.isInstanceOf[SQLSource])
+    assert(reparsedMetricView.from.asInstanceOf[SQLSource].sql ===
+      "SELECT * FROM my_table WHERE year = 2024")
+  }
+
+  test("column ordinals are preserved") {
+    val yaml =
+      """version: "0.1"
+        |source: my_table
+        |dimensions:
+        |  - name: col1
+        |    expr: col1
+        |  - name: col2
+        |    expr: col2
+        |measures:
+        |  - name: col3
+        |    expr: SUM(value)
+        |""".stripMargin
+
+    val metricView = MetricViewFactory.fromYAML(yaml)
+
+    assert(metricView.select(0).ordinal === 0)
+    assert(metricView.select(1).ordinal === 1)
+    assert(metricView.select(2).ordinal === 2)
+  }
+
+  test("column metadata extraction") {
+    val yaml =
+      """version: "0.1"
+        |source: my_table
+        |dimensions:
+        |  - name: customer_id
+        |    expr: customer_id
+        |measures:
+        |  - name: revenue
+        |    expr: SUM(amount)
+        |""".stripMargin
+
+    val metricView = MetricViewFactory.fromYAML(yaml)
+
+    val dimensionMetadata = metricView.select(0).getColumnMetadata
+    assert(dimensionMetadata.columnType === "dimension")
+    assert(dimensionMetadata.expr === "customer_id")
+
+    val measureMetadata = metricView.select(1).getColumnMetadata
+    assert(measureMetadata.columnType === "measure")
+    assert(measureMetadata.expr === "SUM(amount)")
+  }
+
+  test("empty source validation") {
+    val yaml =
+      """version: "0.1"
+        |source: ""
+        |dimensions:
+        |  - name: id
+        |    expr: id
+        |""".stripMargin
+
+    intercept[Exception] {
+      MetricViewFactory.fromYAML(yaml)
+    }
+  }
+
+  test("complex SQL expressions in measures") {
+    val yaml =
+      """version: "0.1"
+        |source: transactions
+        |dimensions:
+        |  - name: date
+        |    expr: DATE(timestamp)
+        |measures:
+        |  - name: weighted_avg
+        |    expr: SUM(amount * weight) / SUM(weight)
+        |  - name: distinct_customers
+        |    expr: COUNT(DISTINCT customer_id)
+        |""".stripMargin
+
+    val metricView = MetricViewFactory.fromYAML(yaml)
+
+    assert(metricView.select.length === 3)
+    assert(metricView.select(0).expression.expr === "DATE(timestamp)")
+    assert(metricView.select(1).expression.expr === "SUM(amount * weight) / 
SUM(weight)")
+    assert(metricView.select(2).expression.expr === "COUNT(DISTINCT 
customer_id)")
+  }
+
+  test("special characters in column names and expressions") {
+    val yaml =
+      """version: "0.1"
+        |source: my_table
+        |dimensions:
+        |  - name: "customer.id"
+        |    expr: "`customer.id`"
+        |measures:
+        |  - name: "revenue_$"
+        |    expr: "SUM(`revenue_$`)"
+        |""".stripMargin
+
+    val metricView = MetricViewFactory.fromYAML(yaml)
+
+    assert(metricView.select(0).name === "customer.id")
+    assert(metricView.select(1).name === "revenue_$")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to