This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new dc47def56265 [SPARK-54403][SQL][METRIC VIEW] Add YAML serde
infrastructure for metric views
dc47def56265 is described below
commit dc47def562652b6d35a6ecb6600373ed645326bf
Author: Linhong Liu <[email protected]>
AuthorDate: Wed Dec 10 11:31:06 2025 +0800
[SPARK-54403][SQL][METRIC VIEW] Add YAML serde infrastructure for metric
views
### What changes were proposed in this pull request?
This PR adds the complete serialization/deserialization infrastructure for
parsing metric view YAML definitions:
- Add Jackson YAML dependencies to pom.xml
- Implement canonical model for metric views:
- Column, Expression (Dimension/Measure), MetricView, Source
- YAMLVersion validation and exception types
- Implement version-specific serde (v0.1):
- YAML deserializer/serializer
- Base classes for extensibility
- Add JSON utilities for metadata serialization
### Why are the changes needed?
[SPIP: Metrics & semantic modeling in
Spark](https://docs.google.com/document/d/1xVTLijvDTJ90lZ_ujwzf9HvBJgWg0mY6cYM44Fcghl0/edit?tab=t.0#heading=h.4iogryr5qznc)
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
```
build/sbt "catalyst/testOnly
org.apache.spark.sql.metricview.serde.MetricViewFactorySuite"
```
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreplyanthropic.com>
Closes #53146 from linhongliu-db/metric-view-serde.
Authored-by: Linhong Liu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
sql/catalyst/pom.xml | 4 +
.../spark/sql/metricview/serde/JsonUtils.scala | 34 ++
.../sql/metricview/serde/MetricViewCanonical.scala | 170 ++++++++++
.../sql/metricview/serde/MetricViewFactory.scala | 69 ++++
.../sql/metricview/serde/MetricViewSerDeBase.scala | 189 +++++++++++
.../sql/metricview/serde/MetricViewSerDeV01.scala | 83 +++++
.../metricview/serde/MetricViewFactorySuite.scala | 373 +++++++++++++++++++++
7 files changed, 922 insertions(+)
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 67ee98bd9430..9b7fb89ddd9e 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -130,6 +130,10 @@
<artifactId>univocity-parsers</artifactId>
<type>jar</type>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.ws.xmlschema</groupId>
<artifactId>xmlschema-core</artifactId>
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/JsonUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/JsonUtils.scala
new file mode 100644
index 000000000000..17a4dac72b7c
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/JsonUtils.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.serde
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+private[sql] object JsonUtils {
+ // Singleton ObjectMapper that can be used across the project
+ private lazy val mapper: ObjectMapper = {
+ val m = new ObjectMapper()
+ m.registerModule(DefaultScalaModule)
+ m
+ }
+
+ def toJson[T: Manifest](obj: T): String = {
+ mapper.writeValueAsString(obj)
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewCanonical.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewCanonical.scala
new file mode 100644
index 000000000000..2e76a13741d0
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewCanonical.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.serde
+
+import scala.util.{Failure, Success, Try}
+
+import com.fasterxml.jackson.annotation.{JsonIgnoreProperties, JsonInclude,
JsonProperty}
+import com.fasterxml.jackson.annotation.JsonInclude.Include
+
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.metricview.serde.ColumnType.ColumnType
+import org.apache.spark.sql.metricview.serde.SourceType.SourceType
+
+private[sql] sealed abstract class MetricViewSerdeException(
+ message: String,
+ cause: Option[Throwable] = None)
+ extends Exception(message, cause.orNull)
+
+private[sql] case class MetricViewValidationException(
+ message: String,
+ cause: Option[Throwable] = None)
+ extends MetricViewSerdeException(message, cause)
+
+private[sql] case class MetricViewFromProtoException(
+ message: String,
+ cause: Option[Throwable] = None)
+ extends MetricViewSerdeException(message, cause)
+
+private[sql] case class MetricViewYAMLParsingException(
+ message: String,
+ cause: Option[Throwable] = None)
+ extends MetricViewSerdeException(message, cause)
+
+// Expression types in a Metric View
+private[sql] sealed trait Expression {
+ def expr: String
+}
+
+// Dimension expression representing a scalar value
+private[sql] case class DimensionExpression(expr: String) extends Expression
+
+// Measure expression representing an aggregated value
+private[sql] case class MeasureExpression(expr: String) extends Expression
+
+private[sql] object SourceType extends Enumeration {
+ type SourceType = Value
+ val ASSET, SQL = Value
+
+ def fromString(sourceType: String): SourceType = {
+ values.find(_.toString.equalsIgnoreCase(sourceType)).getOrElse {
+ throw MetricViewFromProtoException(
+ s"Unsupported source type: $sourceType"
+ )
+ }
+ }
+}
+
+// Representation of a source in the Metric View
+private[sql] sealed trait Source {
+ def sourceType: SourceType
+}
+
+// Asset source, representing a catalog table, view, or Metric View, etc.
+private[sql] case class AssetSource(name: String) extends Source {
+ val sourceType: SourceType = SourceType.ASSET
+
+ override def toString: String = this.name
+}
+
+// SQL source, representing a SQL query
+private[sql] case class SQLSource(sql: String) extends Source {
+ val sourceType: SourceType = SourceType.SQL
+
+ override def toString: String = this.sql
+}
+
+private[sql] object Source {
+ def apply(sourceText: String): Source = {
+ if (sourceText.isEmpty) {
+ throw MetricViewValidationException("Source cannot be empty")
+ }
+ Try(CatalystSqlParser.parseTableIdentifier(sourceText)) match {
+ case Success(_) => AssetSource(sourceText)
+ case Failure(_) =>
+ Try(CatalystSqlParser.parseQuery(sourceText)) match {
+ case Success(_) => SQLSource(sourceText)
+ case Failure(queryEx) =>
+ throw MetricViewValidationException(
+ s"Invalid source: $sourceText",
+ Some(queryEx)
+ )
+ }
+ }
+ }
+}
+
+private[sql] case class Column(
+ name: String,
+ expression: Expression,
+ ordinal: Int) {
+ def columnType: ColumnType = expression match {
+ case _: DimensionExpression => ColumnType.Dimension
+ case _: MeasureExpression => ColumnType.Measure
+ case _ =>
+ throw MetricViewValidationException(
+ s"Unsupported expression type: ${expression.getClass.getName}"
+ )
+ }
+
+ def getColumnMetadata: ColumnMetadata = {
+ val expr = expression.expr
+ if (expr.length > Constants.MAXIMUM_PROPERTY_SIZE) {
+ throw MetricViewValidationException(
+ s"Expression length ${expr.length} exceeds maximum allowed size " +
+ s"${Constants.MAXIMUM_PROPERTY_SIZE} for column '$name'"
+ )
+ }
+ ColumnMetadata(columnType.toString, expr)
+ }
+}
+
+private[sql] object ColumnType extends Enumeration {
+ type ColumnType = Value
+ val Dimension: ColumnType = Value("dimension")
+ val Measure: ColumnType = Value("measure")
+
+ // Method to match case-insensitively and return the correct value
+ def fromString(columnType: String): ColumnType = {
+ values.find(_.toString.equalsIgnoreCase(columnType)).getOrElse {
+ throw MetricViewFromProtoException(
+ s"Unsupported column type: $columnType"
+ )
+ }
+ }
+}
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(Include.NON_ABSENT)
+private[sql] case class ColumnMetadata(
+ @JsonProperty(value = Constants.COLUMN_TYPE_PROPERTY_KEY, required = true)
+ columnType: String, // "type" -> "metric_view.type"
+ @JsonProperty(value = Constants.COLUMN_EXPR_PROPERTY_KEY, required = true)
+ expr: String // "expr" -> "metric_view.expr"
+)
+
+// Only parse the "version" field and ignore all others
+@JsonIgnoreProperties(ignoreUnknown = true)
+private[sql] case class YAMLVersion(version: String) {
+}
+
+private[sql] case class MetricView(
+ version: String,
+ from: Source,
+ where: Option[String] = None,
+ select: Seq[Column])
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewFactory.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewFactory.scala
new file mode 100644
index 000000000000..ced4c7e635e3
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewFactory.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.serde
+
+import scala.util.control.NonFatal
+
+private[sql] object MetricViewFactory {
+ def fromYAML(yamlContent: String): MetricView = {
+ try {
+ val yamlVersion =
+ YamlMapperProviderV01.mapperWithAllFields.readValue(yamlContent,
classOf[YAMLVersion])
+ yamlVersion.version match {
+ case "0.1" =>
+ MetricViewYAMLDeserializerV01.parseYaml(yamlContent).toCanonical
+ case _ =>
+ throw MetricViewValidationException(
+ s"Invalid YAML version: ${yamlVersion.version}"
+ )
+ }
+ } catch {
+ case e: MetricViewSerdeException =>
+ throw e
+ case NonFatal(e) =>
+ throw MetricViewYAMLParsingException(
+ s"Failed to parse YAML: ${e.getMessage}",
+ Some(e)
+ )
+ }
+ }
+
+ def toYAML(metricView: MetricView): String = {
+ try {
+ val versionSpecific = MetricViewBase.fromCanonical(metricView)
+ versionSpecific.version match {
+ case "0.1" =>
+ MetricViewYAMLSerializerV01.toYaml(
+ versionSpecific.asInstanceOf[MetricViewV01]
+ )
+ case _ =>
+ throw MetricViewValidationException(
+ s"Invalid YAML version: ${metricView.version}"
+ )
+ }
+ } catch {
+ case e: MetricViewSerdeException =>
+ throw e
+ case NonFatal(e) =>
+ throw MetricViewYAMLParsingException(
+ s"Failed to serialize to YAML: ${e.getMessage}",
+ Some(e)
+ )
+ }
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeBase.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeBase.scala
new file mode 100644
index 000000000000..137e34e53a9d
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeBase.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.serde
+
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.annotation.JsonInclude
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.dataformat.yaml.{YAMLFactory, YAMLGenerator}
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.yaml.snakeyaml.DumperOptions
+
+private[sql] object Constants {
+ final val MAXIMUM_PROPERTY_SIZE: Int = 1 * 1024
+ final val COLUMN_TYPE_PROPERTY_KEY = "metric_view.type"
+ final val COLUMN_EXPR_PROPERTY_KEY = "metric_view.expr"
+}
+
+private[sql] trait YamlMapperProviderBase {
+ def mapperWithAllFields: ObjectMapper = {
+ val options = new DumperOptions()
+ // Set flow style to BLOCK for better readability (each key-value pair on
separate lines)
+ options.setDefaultFlowStyle(DumperOptions.FlowStyle.BLOCK)
+ // Set indentation to 2 spaces
+ options.setIndent(2)
+ // Set indicator indentation to 2 spaces for list/dict indicators
+ options.setIndicatorIndent(2)
+ // Enable indentation with indicators for better readability
+ options.setIndentWithIndicator(true)
+ // Disable pretty flow so that it doesn't add unnecessary newlines after
dashes
+ options.setPrettyFlow(false)
+
+ val yamlFactory = YAMLFactory.builder()
+ // Minimize quotes around strings when possible
+ .configure(YAMLGenerator.Feature.MINIMIZE_QUOTES, true)
+ // Don't force numbers to be quoted as strings (preserve numeric types)
+ .configure(YAMLGenerator.Feature.ALWAYS_QUOTE_NUMBERS_AS_STRINGS, false)
+ // Don't write YAML document start marker (---)
+ .configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, false)
+ // Disable native type IDs and use explicit type instead
+ .configure(YAMLGenerator.Feature.USE_NATIVE_TYPE_ID, false)
+ .dumperOptions(options)
+ .build()
+
+ val mapper = new ObjectMapper(yamlFactory)
+ // Exclude null values from serialized output
+ .setDefaultPropertyInclusion(JsonInclude.Include.NON_NULL)
+ // Exclude empty collections/strings from serialized output
+ .setDefaultPropertyInclusion(JsonInclude.Include.NON_EMPTY)
+
+ mapper.registerModule(DefaultScalaModule)
+ mapper
+ }
+}
+
+/**
+ * Common YAML parsing logic shared by version-specific YAML parsers.
+ * This trait provides the core parsing functionality while allowing
version-specific
+ * implementations to specify the target type and YAML configuration.
+ */
+private[sql] trait BaseMetricViewYAMLDeserializer[T] {
+ /**
+ * The YAML utilities to use for deserialization.
+ * Subclasses can override this to provide version-specific YAML behavior.
+ */
+ protected def yamlMapperProvider: YamlMapperProviderBase
+
+ /**
+ * Parse YAML content into the specified type.
+ * @param yamlContent The YAML content to parse
+ * @return The parsed MetricView of type T
+ */
+ def parseYaml(yamlContent: String): T = {
+ try {
+ yamlMapperProvider.mapperWithAllFields.readValue(yamlContent,
getTargetClass)
+ } catch {
+ case NonFatal(e) =>
+ throw MetricViewYAMLParsingException(
+ s"Failed to parse YAML: ${e.getMessage}",
+ Some(e)
+ )
+ }
+ }
+ /**
+ * Get the target class for deserialization.
+ * This must be implemented by version-specific implementations.
+ * @return The Class object for the target type
+ */
+ protected def getTargetClass: Class[T]
+}
+
+private[sql] trait BaseMetricViewYAMLSerializer[T] {
+ protected def yamlMapperProvider: YamlMapperProviderBase
+
+ def toYaml(obj: T): String = {
+ try {
+ yamlMapperProvider.mapperWithAllFields.writeValueAsString(obj)
+ } catch {
+ case NonFatal(e) =>
+ throw MetricViewYAMLParsingException(
+ s"Failed to serialize to YAML: ${e.getMessage}",
+ Some(e)
+ )
+ }
+ }
+}
+
+private[sql] trait ColumnBase {
+ def name: String
+ def expr: String
+ def toCanonical(ordinal: Int, isDimension: Boolean): Column = {
+ if (isDimension) {
+ Column(
+ name = name,
+ expression = DimensionExpression(expr),
+ ordinal = ordinal
+ )
+ } else {
+ Column(
+ name = name,
+ expression = MeasureExpression(expr),
+ ordinal = ordinal
+ )
+ }
+ }
+}
+
+private[sql] trait MetricViewBase {
+ def version: String
+ def source: String
+ def filter: Option[String]
+ def dimensions: Seq[ColumnBase]
+ def measures: Seq[ColumnBase]
+
+ // Different YAML versions could have differnt syntax to describe a
MetricView, but
+ // all of them should be able to be converted to the same canonical form.
+ // For example, in later versions, we may change the syntax of source to
support
+ // multiple sources (e.g. joins, compared to the current single source). But
the
+ // canonical form should be able to represent both syntaxes.
+ def toCanonical: MetricView = {
+ // Convert dimensions with proper ordinals (0 to dimensions.length-1)
+ val dimensionsCanonical = dimensions.zipWithIndex.map {
+ case (column, index) => column.toCanonical(index, isDimension = true)
+ }
+ // Convert measures with proper ordinals
+ // (dimensions.length to dimensions.length + measures.length - 1)
+ val measuresCanonical = measures.zipWithIndex.map {
+ case (column, index) =>
+ column.toCanonical(dimensions.length + index, isDimension = false)
+ }
+ MetricView(
+ version = version,
+ from = Source(source),
+ where = filter,
+ select = dimensionsCanonical ++ measuresCanonical
+ )
+ }
+}
+
+private[sql] object MetricViewBase {
+ /**
+ * Factory method to create the appropriate version-specific MetricView from
canonical form.
+ * @param canonical The canonical MetricView to convert from
+ * @return The appropriate version-specific MetricView
+ */
+ def fromCanonical(canonical: MetricView): MetricViewBase = {
+ canonical.version match {
+ case "0.1" =>
+ MetricViewV01.fromCanonical(canonical)
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported version:
${canonical.version}")
+ }
+ }
+}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeV01.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeV01.scala
new file mode 100644
index 000000000000..ee914785b612
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/metricview/serde/MetricViewSerDeV01.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.serde
+
+import com.fasterxml.jackson.annotation.JsonProperty
+
+private[sql] case class ColumnV01(
+ @JsonProperty(required = true) name: String,
+ @JsonProperty(required = true) expr: String
+) extends ColumnBase
+
+private[sql] object ColumnV01 {
+ def fromCanonical(canonical: Column): ColumnV01 = {
+ val name = canonical.name
+ val expr = canonical.expression match {
+ case DimensionExpression(exprStr) => exprStr
+ case MeasureExpression(exprStr) => exprStr
+ case _ =>
+ throw new IllegalArgumentException(
+ s"Unsupported expression type:
${canonical.expression.getClass.getName}")
+ }
+ ColumnV01(name = name, expr = expr)
+ }
+}
+
+private[sql] case class MetricViewV01(
+ @JsonProperty(required = true) version: String,
+ @JsonProperty(required = true) source: String,
+ filter: Option[String] = None,
+ dimensions: Seq[ColumnV01] = Seq.empty,
+ measures: Seq[ColumnV01] = Seq.empty) extends MetricViewBase
+
+private[sql] object MetricViewV01 {
+ def fromCanonical(canonical: MetricView): MetricViewV01 = {
+ val source = canonical.from.toString
+ val filter = canonical.where
+ // Separate dimensions and measures based on expression type
+ val dimensions = canonical.select.collect {
+ case column if column.expression.isInstanceOf[DimensionExpression] =>
+ ColumnV01.fromCanonical(column)
+ }
+ val measures = canonical.select.collect {
+ case column if column.expression.isInstanceOf[MeasureExpression] =>
+ ColumnV01.fromCanonical(column)
+ }
+ MetricViewV01(
+ version = canonical.version,
+ source = source,
+ filter = filter,
+ dimensions = dimensions,
+ measures = measures
+ )
+ }
+}
+
+private[sql] object YamlMapperProviderV01 extends YamlMapperProviderBase
+
+private[sql] object MetricViewYAMLDeserializerV01
+ extends BaseMetricViewYAMLDeserializer[MetricViewV01] {
+ override protected def yamlMapperProvider: YamlMapperProviderBase =
YamlMapperProviderV01
+
+ protected def getTargetClass: Class[MetricViewV01] = classOf[MetricViewV01]
+}
+
+private[sql] object MetricViewYAMLSerializerV01
+ extends BaseMetricViewYAMLSerializer[MetricViewV01] {
+ override protected def yamlMapperProvider: YamlMapperProviderBase =
YamlMapperProviderV01
+}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/metricview/serde/MetricViewFactorySuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/metricview/serde/MetricViewFactorySuite.scala
new file mode 100644
index 000000000000..77d2b1cfbf3e
--- /dev/null
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/metricview/serde/MetricViewFactorySuite.scala
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metricview.serde
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Test suite for [[MetricViewFactory]] YAML serialization and deserialization.
+ */
+class MetricViewFactorySuite extends SparkFunSuite {
+
+ test("fromYAML - parse basic metric view with asset source") {
+ val yaml =
+ """version: "0.1"
+ |source: my_table
+ |dimensions:
+ | - name: customer_id
+ | expr: customer_id
+ |measures:
+ | - name: total_revenue
+ | expr: SUM(revenue)
+ |""".stripMargin
+
+ val metricView = MetricViewFactory.fromYAML(yaml)
+
+ assert(metricView.version === "0.1")
+ assert(metricView.from.isInstanceOf[AssetSource])
+ assert(metricView.from.asInstanceOf[AssetSource].name === "my_table")
+ assert(metricView.where.isEmpty)
+ assert(metricView.select.length === 2)
+
+ val customerIdCol = metricView.select(0)
+ assert(customerIdCol.name === "customer_id")
+ assert(customerIdCol.expression.isInstanceOf[DimensionExpression])
+ assert(customerIdCol.expression.expr === "customer_id")
+
+ val revenueCol = metricView.select(1)
+ assert(revenueCol.name === "total_revenue")
+ assert(revenueCol.expression.isInstanceOf[MeasureExpression])
+ assert(revenueCol.expression.expr === "SUM(revenue)")
+ }
+
+ test("fromYAML - parse metric view with SQL source") {
+ val yaml =
+ """version: "0.1"
+ |source: SELECT * FROM my_table WHERE year = 2024
+ |dimensions:
+ | - name: product_id
+ | expr: product_id
+ |measures:
+ | - name: quantity_sold
+ | expr: SUM(quantity)
+ |""".stripMargin
+
+ val metricView = MetricViewFactory.fromYAML(yaml)
+
+ assert(metricView.from.isInstanceOf[SQLSource])
+ assert(metricView.from.asInstanceOf[SQLSource].sql ===
+ "SELECT * FROM my_table WHERE year = 2024")
+ assert(metricView.select.length === 2)
+ }
+
+ test("fromYAML - parse metric view with filter clause") {
+ val yaml =
+ """version: "0.1"
+ |source: sales_data
+ |filter: year >= 2020 AND status = 'completed'
+ |dimensions:
+ | - name: region
+ | expr: region
+ |measures:
+ | - name: sales
+ | expr: SUM(amount)
+ |""".stripMargin
+
+ val metricView = MetricViewFactory.fromYAML(yaml)
+
+ assert(metricView.where.isDefined)
+ assert(metricView.where.get === "year >= 2020 AND status = 'completed'")
+ }
+
+ test("fromYAML - parse metric view with multiple dimensions and measures") {
+ val yaml =
+ """version: "0.1"
+ |source: transactions
+ |dimensions:
+ | - name: customer_id
+ | expr: customer_id
+ | - name: product_id
+ | expr: product_id
+ | - name: region
+ | expr: region
+ |measures:
+ | - name: total_revenue
+ | expr: SUM(revenue)
+ | - name: avg_revenue
+ | expr: AVG(revenue)
+ | - name: transaction_count
+ | expr: COUNT(*)
+ |""".stripMargin
+
+ val metricView = MetricViewFactory.fromYAML(yaml)
+
+ assert(metricView.select.length === 6)
+
+ // Check dimensions
+ assert(metricView.select(0).expression.isInstanceOf[DimensionExpression])
+ assert(metricView.select(1).expression.isInstanceOf[DimensionExpression])
+ assert(metricView.select(2).expression.isInstanceOf[DimensionExpression])
+
+ // Check measures
+ assert(metricView.select(3).expression.isInstanceOf[MeasureExpression])
+ assert(metricView.select(4).expression.isInstanceOf[MeasureExpression])
+ assert(metricView.select(5).expression.isInstanceOf[MeasureExpression])
+ }
+
+ test("fromYAML - invalid YAML version") {
+ val yaml =
+ """version: "99.9"
+ |source: my_table
+ |dimensions:
+ | - name: id
+ | expr: id
+ |""".stripMargin
+
+ val exception = intercept[MetricViewValidationException] {
+ MetricViewFactory.fromYAML(yaml)
+ }
+ assert(exception.getMessage.contains("Invalid YAML version: 99.9"))
+ }
+
+ test("fromYAML - malformed YAML") {
+ val yaml = """this is not valid yaml: [unclosed bracket"""
+
+ val exception = intercept[MetricViewYAMLParsingException] {
+ MetricViewFactory.fromYAML(yaml)
+ }
+ assert(exception.getMessage.contains("Failed to parse YAML"))
+ }
+
+ test("toYAML - serialize basic metric view") {
+ val metricView = MetricView(
+ version = "0.1",
+ from = AssetSource("my_table"),
+ where = None,
+ select = Seq(
+ Column("customer_id", DimensionExpression("customer_id"), 0),
+ Column("total_revenue", MeasureExpression("SUM(revenue)"), 1)
+ )
+ )
+
+ val yaml = MetricViewFactory.toYAML(metricView)
+
+ assert(yaml.contains("version: 0.1") || yaml.contains("version: \"0.1\""))
+ assert(yaml.contains("source: my_table"))
+ assert(yaml.contains("customer_id"))
+ assert(yaml.contains("total_revenue"))
+
+ // Verify it can be parsed back
+ val reparsed = MetricViewFactory.fromYAML(yaml)
+ assert(reparsed.version === "0.1")
+ assert(reparsed.from.asInstanceOf[AssetSource].name === "my_table")
+ }
+
+ test("toYAML - serialize metric view with SQL source") {
+ val metricView = MetricView(
+ version = "0.1",
+ from = SQLSource("SELECT * FROM table WHERE id > 100"),
+ where = None,
+ select = Seq(
+ Column("id", DimensionExpression("id"), 0)
+ )
+ )
+
+ val yaml = MetricViewFactory.toYAML(metricView)
+
+ assert(yaml.contains("source: SELECT * FROM table WHERE id > 100"))
+ }
+
+ test("toYAML - serialize metric view with filter clause") {
+ val metricView = MetricView(
+ version = "0.1",
+ from = AssetSource("sales"),
+ where = Some("year >= 2020"),
+ select = Seq(
+ Column("region", DimensionExpression("region"), 0),
+ Column("sales", MeasureExpression("SUM(amount)"), 1)
+ )
+ )
+
+ val yaml = MetricViewFactory.toYAML(metricView)
+
+ assert(yaml.contains("year >= 2020"))
+
+ // Verify it can be parsed back
+ val reparsed = MetricViewFactory.fromYAML(yaml)
+ assert(reparsed.where.isDefined)
+ assert(reparsed.where.get === "year >= 2020")
+ }
+
+ test("roundtrip - fromYAML and toYAML preserve data") {
+ val originalYaml =
+ """version: "0.1"
+ |source: sales_table
+ |filter: status = 'active'
+ |dimensions:
+ | - name: customer_id
+ | expr: customer_id
+ | - name: product_name
+ | expr: product_name
+ |measures:
+ | - name: total_revenue
+ | expr: SUM(revenue)
+ | - name: order_count
+ | expr: COUNT(order_id)
+ |""".stripMargin
+
+ // Parse the YAML
+ val metricView = MetricViewFactory.fromYAML(originalYaml)
+
+ // Serialize it back
+ val serializedYaml = MetricViewFactory.toYAML(metricView)
+
+ // Parse again to verify
+ val reparsedMetricView = MetricViewFactory.fromYAML(serializedYaml)
+
+ // Verify all fields match
+ assert(reparsedMetricView.version === metricView.version)
+ assert(reparsedMetricView.from === metricView.from)
+ assert(reparsedMetricView.where === metricView.where)
+ assert(reparsedMetricView.select.length === metricView.select.length)
+
+ reparsedMetricView.select.zip(metricView.select).foreach { case (col1,
col2) =>
+ assert(col1.name === col2.name)
+ assert(col1.expression.expr === col2.expression.expr)
+ assert(col1.expression.getClass === col2.expression.getClass)
+ }
+ }
+
+ test("roundtrip - SQL source preservation") {
+ val originalYaml =
+ """version: "0.1"
+ |source: SELECT * FROM my_table WHERE year = 2024
+ |dimensions:
+ | - name: id
+ | expr: id
+ |measures:
+ | - name: total
+ | expr: SUM(value)
+ |""".stripMargin
+
+ val metricView = MetricViewFactory.fromYAML(originalYaml)
+ val serializedYaml = MetricViewFactory.toYAML(metricView)
+ val reparsedMetricView = MetricViewFactory.fromYAML(serializedYaml)
+
+ assert(reparsedMetricView.from.isInstanceOf[SQLSource])
+ assert(reparsedMetricView.from.asInstanceOf[SQLSource].sql ===
+ "SELECT * FROM my_table WHERE year = 2024")
+ }
+
+ test("column ordinals are preserved") {
+ val yaml =
+ """version: "0.1"
+ |source: my_table
+ |dimensions:
+ | - name: col1
+ | expr: col1
+ | - name: col2
+ | expr: col2
+ |measures:
+ | - name: col3
+ | expr: SUM(value)
+ |""".stripMargin
+
+ val metricView = MetricViewFactory.fromYAML(yaml)
+
+ assert(metricView.select(0).ordinal === 0)
+ assert(metricView.select(1).ordinal === 1)
+ assert(metricView.select(2).ordinal === 2)
+ }
+
+ test("column metadata extraction") {
+ val yaml =
+ """version: "0.1"
+ |source: my_table
+ |dimensions:
+ | - name: customer_id
+ | expr: customer_id
+ |measures:
+ | - name: revenue
+ | expr: SUM(amount)
+ |""".stripMargin
+
+ val metricView = MetricViewFactory.fromYAML(yaml)
+
+ val dimensionMetadata = metricView.select(0).getColumnMetadata
+ assert(dimensionMetadata.columnType === "dimension")
+ assert(dimensionMetadata.expr === "customer_id")
+
+ val measureMetadata = metricView.select(1).getColumnMetadata
+ assert(measureMetadata.columnType === "measure")
+ assert(measureMetadata.expr === "SUM(amount)")
+ }
+
+ test("empty source validation") {
+ val yaml =
+ """version: "0.1"
+ |source: ""
+ |dimensions:
+ | - name: id
+ | expr: id
+ |""".stripMargin
+
+ intercept[Exception] {
+ MetricViewFactory.fromYAML(yaml)
+ }
+ }
+
+ test("complex SQL expressions in measures") {
+ val yaml =
+ """version: "0.1"
+ |source: transactions
+ |dimensions:
+ | - name: date
+ | expr: DATE(timestamp)
+ |measures:
+ | - name: weighted_avg
+ | expr: SUM(amount * weight) / SUM(weight)
+ | - name: distinct_customers
+ | expr: COUNT(DISTINCT customer_id)
+ |""".stripMargin
+
+ val metricView = MetricViewFactory.fromYAML(yaml)
+
+ assert(metricView.select.length === 3)
+ assert(metricView.select(0).expression.expr === "DATE(timestamp)")
+ assert(metricView.select(1).expression.expr === "SUM(amount * weight) /
SUM(weight)")
+ assert(metricView.select(2).expression.expr === "COUNT(DISTINCT
customer_id)")
+ }
+
+ test("special characters in column names and expressions") {
+ val yaml =
+ """version: "0.1"
+ |source: my_table
+ |dimensions:
+ | - name: "customer.id"
+ | expr: "`customer.id`"
+ |measures:
+ | - name: "revenue_$"
+ | expr: "SUM(`revenue_$`)"
+ |""".stripMargin
+
+ val metricView = MetricViewFactory.fromYAML(yaml)
+
+ assert(metricView.select(0).name === "customer.id")
+ assert(metricView.select(1).name === "revenue_$")
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]