This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 49aaed209dc8 [SPARK-56956][SDP] Introduce AutoCDC Flow Dataclasses
49aaed209dc8 is described below
commit 49aaed209dc884f4022bf7f507ce580ea6b4225d
Author: AnishMahto <[email protected]>
AuthorDate: Tue May 26 15:30:16 2026 +0800
[SPARK-56956][SDP] Introduce AutoCDC Flow Dataclasses
Approved AutoCDC SPIP:
https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
--------
### What changes were proposed in this pull request?
Introduce dataclass for unresolved AutoCDC flow (`AutoCdcFlow`) and
resolved AutoCDC flow (`AutoCdcMergeFlow`). Add wiring to analyze an
`AutoCdcFlow` to an `AutoCdcMergeFlow`.
A small refactor was additionally made on the `UnresolvedFlow` and
`ResolvedFlow` class hierarchy.
### Why are the changes needed?
Support AutoCDC flow registration and analysis. AutoCDC flow execution will
be supported in a future PR. Previously, an `UnresolvedFlow` additionally
always represented an untyped-flow; a flow where do not yet know its
execution-type, i.e streaming, append-once, etc.
`AutoCdcFlow` is a specialized flow with support for only streaming flows,
hence it represents a flow whose execution-type we know at construction. It is
still unresolved at registration time, and needs to go through resolution to
determine its position in the DAG and its input/output schemas.
Hence we introduce the intermediary child `UntypedFlow` for
`UnresolvedFlow`, which all previous flows are classified as during
registration. An `AutoCdcFlow` directly implements `UnresolvedFlow` (skipping
`UntypedFlow in its inheritance chain) because it is not untyped.
### Does this PR introduce _any_ user-facing change?
No, the AutoCDC feature is not released anywhere yet.
### How was this patch tested?
`ConnectValidPipelineSuite` and `AutoCdcFlowSuite`
### Was this patch authored or co-authored using generative AI tooling?
Co-authored.
Generated-by: Claude-Opus-4.7-thinking-xhigh
Closes #56042 from AnishMahto/SPARK-56956-introduce-flow-data-classes.
Authored-by: AnishMahto <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 27 +-
.../sql/connect/pipelines/PipelinesHandler.scala | 4 +-
.../spark/sql/pipelines/autocdc/ChangeArgs.scala | 2 +-
.../sql/pipelines/autocdc/Scd1BatchProcessor.scala | 16 +-
.../graph/CoreDataflowNodeProcessor.scala | 15 +-
.../apache/spark/sql/pipelines/graph/Flow.scala | 192 ++++++-
.../pipelines/graph/GraphRegistrationContext.scala | 2 +-
.../sql/pipelines/graph/GraphValidations.scala | 41 +-
.../graph/SqlGraphRegistrationContext.scala | 10 +-
.../sql/pipelines/autocdc/AutoCdcFlowSuite.scala | 601 +++++++++++++++++++++
.../autocdc/Scd1BatchProcessorSuite.scala | 77 ---
.../graph/ConnectInvalidPipelineSuite.scala | 217 ++++++++
.../graph/ConnectValidPipelineSuite.scala | 33 ++
.../utils/TestGraphRegistrationContext.scala | 8 +-
14 files changed, 1140 insertions(+), 105 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index d907d027b0ed..9c9a657bc6e9 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -203,6 +203,12 @@
],
"sqlState" : "22023"
},
+ "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA" : {
+ "message" : [
+ "Using <caseSensitivity> column name comparison, the AutoCDC key column
`<keyColumnName>` is not present in the flow's selected source schema. AutoCDC
requires every key column to be present in the source change-data feed and
retained by any configured column selection."
+ ],
+ "sqlState" : "22023"
+ },
"AUTOCDC_MICROBATCH_VALIDATION" : {
"message" : [
"AutoCDC flow on table <tableName> in batch <batchId> failed microbatch
validation."
@@ -232,12 +238,24 @@
],
"sqlState" : "42703"
},
- "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT" : {
+ "AUTOCDC_MULTIPLE_FLOWS_TO_TARGET" : {
+ "message" : [
+ "Invalid AutoCDC destination <tableName> with multiple flows: <flows>.
An AutoCDC target table must have exactly one flow writing to it."
+ ],
+ "sqlState" : "42000"
+ },
+ "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT" : {
"message" : [
- "Using <caseSensitivity> column name comparison, the column
`<columnName>` in the <schemaName> schema conflicts with the reserved AutoCDC
column name `<reservedColumnName>`. Rename or remove the column."
+ "The column `<columnName>` in the <schemaName> schema collides with the
reserved AutoCDC column name prefix `<reservedColumnNamePrefix>` (using
<caseSensitivity> column name comparison). Rename or remove the column."
],
"sqlState" : "42710"
},
+ "AUTOCDC_SCD2_NOT_SUPPORTED" : {
+ "message" : [
+ "AutoCDC flows do not currently support SCD Type 2 transformations."
+ ],
+ "sqlState" : "0A000"
+ },
"AVRO_CANNOT_WRITE_NULL_FIELD" : {
"message" : [
"Cannot write null value for field <name> defined as non-null Avro data
type <dataType>.",
@@ -3710,6 +3728,11 @@
"Flow <flowIdentifier> returns an invalid relation type."
],
"subClass" : {
+ "AUTOCDC_RELATION_FOR_TEMPORARY_VIEW" : {
+ "message" : [
+ "AutoCDC flows must target a streaming table because their
reconciliation semantics require a streaming-table sink, but the flow
<flowIdentifier> attempts to write an AutoCDC relation to the temporary view
<viewIdentifier>."
+ ]
+ },
"BATCH_RELATION_FOR_STREAMING_TABLE" : {
"message" : [
"Streaming tables may only be defined by streaming relations, but
the flow <flowIdentifier> attempts to write a batch relation to the streaming
table <tableIdentifier>. Consider using the STREAM operator in Spark-SQL to
convert the batch relation into a streaming relation, or populating the
streaming table with an append once-flow instead."
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
index ee19b83c5162..04dbc1a45506 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.connect.service.SessionHolder
import org.apache.spark.sql.execution.command.{ShowCatalogsCommand,
ShowNamespacesCommand}
import org.apache.spark.sql.pipelines.Language.Python
import org.apache.spark.sql.pipelines.common.RunState.{CANCELED, FAILED}
-import org.apache.spark.sql.pipelines.graph.{AllTables, FlowAnalysis,
GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables,
PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink,
SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter,
TemporaryView, UnresolvedFlow}
+import org.apache.spark.sql.pipelines.graph.{AllTables, FlowAnalysis,
GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables,
PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink,
SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter,
TemporaryView, UntypedFlow}
import org.apache.spark.sql.pipelines.logging.{PipelineEvent, RunProgress}
import org.apache.spark.sql.types.StructType
@@ -371,7 +371,7 @@ private[connect] object PipelinesHandler extends Logging {
case proto.PipelineCommand.DefineFlow.DetailsCase.RELATION_FLOW_DETAILS
=>
val relationFlowDetails = flow.getRelationFlowDetails
graphElementRegistry.registerFlow(
- UnresolvedFlow(
+ UntypedFlow(
identifier = flowIdentifier,
destinationIdentifier = destinationIdentifier,
func = FlowAnalysis.createFlowFunctionFromLogicalPlan(
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
index c17c89967baa..b975e06807f5 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
@@ -120,7 +120,7 @@ object ColumnSelection {
}
/** User-facing case-sensitivity labels surfaced in AutoCDC error messages. */
-private[autocdc] object CaseSensitivityLabels {
+private[pipelines] object CaseSensitivityLabels {
val CaseSensitive: String = "case-sensitive"
val CaseInsensitive: String = "case-insensitive"
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index 32aebc8924c0..aaea3b63e9ef 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -416,9 +416,15 @@ case class Scd1BatchProcessor(
}
object Scd1BatchProcessor {
- // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP
AutoCDC processing.
- private[autocdc] val winningRowColName: String =
"__spark_autocdc_winning_row"
- private[autocdc] val cdcMetadataColName: String = "__spark_autocdc_metadata"
+ /**
+ * Reserved column-name prefix for internal SDP AutoCDC processing. Source
change-data-feed
+ * dataframes must not contain any columns starting with this prefix; the
invariant is
+ * enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]]
construction.
+ */
+ private[pipelines] val reservedColumnNamePrefix: String = "__spark_autocdc_"
+
+ private[autocdc] val winningRowColName: String =
s"${reservedColumnNamePrefix}winning_row"
+ private[pipelines] val cdcMetadataColName: String =
s"${reservedColumnNamePrefix}metadata"
private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence"
private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence"
@@ -434,7 +440,7 @@ object Scd1BatchProcessor {
/**
* Schema of the CDC metadata struct column for SCD1.
*/
- private def cdcMetadataColSchema(sequencingType: DataType): StructType =
+ private[pipelines] def cdcMetadataColSchema(sequencingType: DataType):
StructType =
StructType(
Seq(
// The sequencing of the event if it represents a delete, null
otherwise.
@@ -448,7 +454,7 @@ object Scd1BatchProcessor {
* Construct the CDC metadata struct column for SCD1, following the exact
schema and field
* ordering defined by [[cdcMetadataColSchema]].
*/
- private[autocdc] def constructCdcMetadataCol(
+ private[pipelines] def constructCdcMetadataCol(
deleteSequence: Column,
upsertSequence: Column,
sequencingType: DataType): Column = {
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
index 38fde0bfec4a..66f2995ee02d 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
@@ -176,7 +176,7 @@ private class FlowResolver(rawGraph: DataflowGraph) {
} else {
f
}
- convertResolvedToTypedFlow(flowToResolve, maybeNewFuncResult)
+ resolveFlow(flowToResolve, maybeNewFuncResult)
// If the flow failed due to an UnresolvedDatasetException, it means
that one of the
// flow's inputs wasn't available. After other flows are resolved,
these inputs
@@ -199,9 +199,18 @@ private class FlowResolver(rawGraph: DataflowGraph) {
}
}
- private def convertResolvedToTypedFlow(
+ private def resolveFlow(
flow: UnresolvedFlow,
funcResult: FlowFunctionResult): ResolvedFlow = {
+ flow match {
+ case acf: AutoCdcFlow => new AutoCdcMergeFlow(acf, funcResult)
+ case utf: UntypedFlow => transformUntypedFlowToResolvedFlow(utf,
funcResult)
+ }
+ }
+
+ private def transformUntypedFlowToResolvedFlow(
+ flow: UntypedFlow,
+ funcResult: FlowFunctionResult): ResolvedFlow = {
flow match {
case _ if flow.once => new AppendOnceFlow(flow, funcResult)
case _ if funcResult.dataFrame.get.isStreaming =>
@@ -210,7 +219,7 @@ private class FlowResolver(rawGraph: DataflowGraph) {
// then get their results overwritten.
val mustBeAppend = rawGraph.flowsTo(flow.destinationIdentifier).size >
1
new StreamingFlow(flow, funcResult, mustBeAppend = mustBeAppend)
- case _: UnresolvedFlow => new CompleteFlow(flow, funcResult)
+ case _ => new CompleteFlow(flow, funcResult)
}
}
}
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
index e329308502f0..04ef8d3186c5 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
@@ -20,11 +20,19 @@ package org.apache.spark.sql.pipelines.graph
import scala.util.Try
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{functions => F, AnalysisException, Column}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
import org.apache.spark.sql.classic.DataFrame
import org.apache.spark.sql.pipelines.AnalysisWarning
+import org.apache.spark.sql.pipelines.autocdc.{
+ CaseSensitivityLabels,
+ ChangeArgs,
+ ColumnSelection,
+ Scd1BatchProcessor,
+ ScdType
+}
import org.apache.spark.sql.pipelines.util.InputReadOptions
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
/**
* Contains the catalog and database context information for query execution.
@@ -121,7 +129,21 @@ case class FlowFunctionResult(
}
/** A [[Flow]] whose output schema and dependencies aren't known. */
-case class UnresolvedFlow(
+sealed trait UnresolvedFlow extends Flow {
+ /** Returns a copy of this flow with the given SQL confs overriding the
existing ones. */
+ def withSqlConf(newSqlConf: Map[String, String]): UnresolvedFlow
+}
+
+/**
+ * An [[UnresolvedFlow]] whose execution-type has not yet been determined.
+ *
+ * In some cases, we know the execution-type for an [[UnresolvedFlow]] even
before flow analysis
+ * and resolution. For example an AutoCDCFlow is a special
unresolved-but-typed flow; we know a
+ * flow will be an AutoCDC flow immediately on construction, because it has
its own special
+ * registration API. Such flows are considered "typed flows", but there isn't
any semantic reason
+ * yet to explicitly introduce a `TypedFlow` trait/class.
+ */
+case class UntypedFlow(
identifier: TableIdentifier,
destinationIdentifier: TableIdentifier,
func: FlowFunction,
@@ -129,7 +151,34 @@ case class UnresolvedFlow(
sqlConf: Map[String, String],
override val once: Boolean,
override val origin: QueryOrigin
-) extends Flow
+) extends UnresolvedFlow {
+ override def withSqlConf(newSqlConf: Map[String, String]): UntypedFlow =
+ copy(sqlConf = newSqlConf)
+}
+
+/**
+ * An unresolved but typed flow that applies a CDC event stream to a target
table via MERGE.
+ *
+ * [[AutoCdcFlow]] is a typed flow because it is only supported for streaming,
and not as a once
+ * flow. Therefore by definition it is a streaming-type flow.
+ *
+ * In the future once-support for [[AutoCdcFlow]] may be added.
+ */
+case class AutoCdcFlow(
+ identifier: TableIdentifier,
+ destinationIdentifier: TableIdentifier,
+ func: FlowFunction,
+ queryContext: QueryContext,
+ sqlConf: Map[String, String] = Map.empty,
+ comment: Option[String] = None,
+ override val origin: QueryOrigin,
+ changeArgs: ChangeArgs
+) extends UnresolvedFlow {
+ override val once: Boolean = false
+
+ override def withSqlConf(newSqlConf: Map[String, String]): AutoCdcFlow =
+ copy(sqlConf = newSqlConf)
+}
/**
* A [[Flow]] whose flow function has been invoked, meaning either:
@@ -194,3 +243,140 @@ class AppendOnceFlow(
override val once = true
}
+
+/**
+ * A resolved flow that applies a CDC event stream to a target table via
MERGE, in accordance to
+ * the configured [[flow.changeArgs]].
+ */
+class AutoCdcMergeFlow(
+ val flow: AutoCdcFlow,
+ val funcResult: FlowFunctionResult
+) extends ResolvedFlow {
+ requireReservedPrefixAbsentInSourceColumns()
+
+ def changeArgs: ChangeArgs = flow.changeArgs
+
+ /** The user-selected projection of [[df.schema]] (i.e. before the SCD
metadata column). */
+ private val userSelectedSchema: StructType = {
+ val selectedSchema = ColumnSelection.applyToSchema(
+ schemaName = "changeDataFeed",
+ schema = df.schema,
+ columnSelection = changeArgs.columnSelection,
+ caseSensitive = spark.sessionState.conf.caseSensitiveAnalysis
+ )
+ // AutoCDC flows require all key columns to be present in the target
table, to adhere to SCD
+ // semantics.
+ requireKeysPresentInSelectedSchema(selectedSchema)
+ selectedSchema
+ }
+
+ /** The DataType of the sequencing expression, derived once from the source
change feed. */
+ private val sequencingType: DataType =
+ df.select(changeArgs.sequencing).schema.head.dataType
+
+ /**
+ * Returns the augmented output schema of this flow, which can differ from
the schema of the
+ * source change-data-feed dataframe.
+ *
+ * The source dataframe's schema describes the incoming CDC events; the
augmented schema here
+ * applies the user-specified [[ColumnSelection]] and appends the
SCD-specific metadata
+ * columns that the AutoCDC MERGE engine projects onto the target table.
Downstream
+ * dependencies in the pipeline see this augmented schema.
+ */
+ override val schema: StructType = changeArgs.storedAsScdType match {
+ case ScdType.Type1 =>
+ // SCD1 produces a target table with all the user-selected output
columns and a projected
+ // CDC operational metadata column at the end.
+ StructType(
+ userSelectedSchema.fields :+ StructField(
+ Scd1BatchProcessor.cdcMetadataColName,
+ Scd1BatchProcessor.cdcMetadataColSchema(sequencingType),
+ nullable = false
+ )
+ )
+ case ScdType.Type2 =>
+ throw new AnalysisException(
+ errorClass = "AUTOCDC_SCD2_NOT_SUPPORTED",
+ messageParameters = Map.empty
+ )
+ }
+
+ /**
+ * Returns an empty dataframe whose schema matches
[[AutoCdcMergeFlow.schema]].
+ *
+ * Today, [[AutoCdcMergeFlow.load]] is not actually ever called during graph
analysis or
+ * execution. An AutoCdcMergeFlow can only be an input to a streaming table
(not an MV or
+ * persisted/temp view), and streaming tables take a [[VirtualTableInput]]
as input, not
+ * the producing [[Flow]] directly. [[VirtualTableInput]] overrides its own
[[load]] to do
+ * schema inference on its input flows, rather than a transitive
[[Flow.load]].
+ *
+ * The [[AutoCdcMergeFlow.load]] implementation exists solely for API
consistency.
+ */
+ override def load(readOptions: InputReadOptions): DataFrame =
changeArgs.storedAsScdType match {
+ case ScdType.Type1 =>
+ val userSelectedCols: Seq[Column] =
userSelectedSchema.fieldNames.toSeq.map(F.col)
+ val emptyCdcMetadataCol: Column =
Scd1BatchProcessor.constructCdcMetadataCol(
+ deleteSequence = F.lit(null),
+ upsertSequence = F.lit(null),
+ sequencingType = sequencingType
+ ).as(Scd1BatchProcessor.cdcMetadataColName)
+
+ df.select(userSelectedCols :+ emptyCdcMetadataCol: _*)
+ case ScdType.Type2 =>
+ throw new AnalysisException(
+ errorClass = "AUTOCDC_SCD2_NOT_SUPPORTED",
+ messageParameters = Map.empty
+ )
+ }
+
+ /**
+ * Validate that the resolved source dataframe for the AutoCDC flow does not
contain any column
+ * names that use the reserved Spark AutoCDC prefix.
+ */
+ private def requireReservedPrefixAbsentInSourceColumns(): Unit = {
+ val resolver = spark.sessionState.conf.resolver
+ val reservedPrefix = Scd1BatchProcessor.reservedColumnNamePrefix
+
+ def nameContainsReservedPrefix(name: String): Boolean = {
+ name.length >= reservedPrefix.length && resolver(
+ name.substring(0, reservedPrefix.length),
+ reservedPrefix
+ )
+ }
+
+ df.schema.fieldNames.find(nameContainsReservedPrefix).foreach {
conflictingColumnName =>
+ throw new AnalysisException(
+ errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT",
+ messageParameters = Map(
+ "caseSensitivity" -> CaseSensitivityLabels.of(
+ spark.sessionState.conf.caseSensitiveAnalysis
+ ),
+ "columnName" -> conflictingColumnName,
+ "schemaName" -> "changeDataFeed",
+ "reservedColumnNamePrefix" -> reservedPrefix
+ )
+ )
+ }
+ }
+
+ /**
+ * Validate all keys specified in changeArgs are actually present in the
user-selected schema.
+ */
+ private def requireKeysPresentInSelectedSchema(selectedSchema: StructType):
Unit = {
+ val resolver = spark.sessionState.conf.resolver
+
+ changeArgs.keys
+ .find(key => !selectedSchema.fieldNames.exists(name => resolver(name,
key.name)))
+ .foreach { missingKey =>
+ throw new AnalysisException(
+ errorClass = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA",
+ messageParameters = Map(
+ "caseSensitivity" -> CaseSensitivityLabels.of(
+ spark.sessionState.conf.caseSensitiveAnalysis
+ ),
+ "keyColumnName" -> missingKey.name
+ )
+ )
+ }
+ }
+}
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala
index dadda0561b19..970fdb4b70e9 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala
@@ -59,7 +59,7 @@ class GraphRegistrationContext(
}
def registerFlow(flowDef: UnresolvedFlow): Unit = {
- flows += flowDef.copy(sqlConf = defaultSqlConf ++ flowDef.sqlConf)
+ flows += flowDef.withSqlConf(defaultSqlConf ++ flowDef.sqlConf)
}
private def isEmpty: Boolean = {
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
index ade6ce0bad3c..e5ad3de44a8a 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
@@ -34,6 +34,24 @@ trait GraphValidations extends Logging {
*/
protected[pipelines] def validateMultiQueryTables(): Map[TableIdentifier,
Seq[Flow]] = {
val multiQueryTables = flowsTo.filter(_._2.size > 1)
+
+ // A multiflow table may not have an AutoCDC flow; AutoCDC flow targets
must be single query.
+ multiQueryTables
+ .find { case (_, flows) => flows.exists(isAutoCdcFlow) }
+ .foreach {
+ case (dest, flows) =>
+ throw new AnalysisException(
+ "AUTOCDC_MULTIPLE_FLOWS_TO_TARGET",
+ Map(
+ "tableName" -> dest.unquotedString,
+ "flows" -> flows
+ .map(_.displayName)
+ .sorted
+ .mkString(", ")
+ )
+ )
+ }
+
// Non-streaming tables do not support multiflow.
multiQueryTables
.find {
@@ -58,6 +76,12 @@ trait GraphValidations extends Logging {
multiQueryTables
}
+ /** Returns true iff the given flow is an [[AutoCdcFlow]] (resolved or not).
*/
+ private def isAutoCdcFlow(f: Flow): Boolean = f match {
+ case _: AutoCdcFlow | _: AutoCdcMergeFlow => true
+ case _ => false
+ }
+
/**
* Validate that each resolved flow is correctly either a streaming flow or
non-streaming flow,
* depending on the flow type (ex. once flow vs non-once flow) and the
dataset type the flow
@@ -126,8 +150,21 @@ trait GraphValidations extends Logging {
)
}
case _: TemporaryView =>
- // Temporary views' flows are allowed to be either streaming or
batch, so no
- // validation needs to be done for them
+ // Temporary views' flows are generally allowed to be either
streaming or batch.
+ resolvedFlow match {
+ case _: AutoCdcMergeFlow =>
+ // The exception is AutoCDC flows, which require a
streaming-table sink to
+ // immediately execute MERGE against.
+ throw new AnalysisException(
+ errorClass =
+
"INVALID_FLOW_QUERY_TYPE.AUTOCDC_RELATION_FOR_TEMPORARY_VIEW",
+ messageParameters = Map(
+ "flowIdentifier" -> resolvedFlow.identifier.quotedString,
+ "viewIdentifier" -> destTableIdentifier.quotedString
+ )
+ )
+ case _ => // OK: any other flow is permitted to target a
temporary view.
+ }
}
}
}
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala
index 829179142dc5..4dfd09693578 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala
@@ -237,7 +237,7 @@ class SqlGraphRegistrationContext(
// Register flow that backs this streaming table.
graphRegistrationContext.registerFlow(
- UnresolvedFlow(
+ UntypedFlow(
identifier = stIdentifier,
destinationIdentifier = stIdentifier,
func = FlowAnalysis.createFlowFunctionFromLogicalPlan(cst.query),
@@ -288,7 +288,7 @@ class SqlGraphRegistrationContext(
// Register flow that backs this materialized view.
graphRegistrationContext.registerFlow(
- UnresolvedFlow(
+ UntypedFlow(
identifier = mvIdentifier,
destinationIdentifier = mvIdentifier,
func = FlowAnalysis.createFlowFunctionFromLogicalPlan(cmv.query),
@@ -331,7 +331,7 @@ class SqlGraphRegistrationContext(
// Register flow that backs this persisted view.
graphRegistrationContext.registerFlow(
- UnresolvedFlow(
+ UntypedFlow(
identifier = viewIdentifier,
destinationIdentifier = viewIdentifier,
func = FlowAnalysis.createFlowFunctionFromLogicalPlan(cv.query),
@@ -375,7 +375,7 @@ class SqlGraphRegistrationContext(
// Register flow definition that backs this temporary view.
graphRegistrationContext.registerFlow(
- UnresolvedFlow(
+ UntypedFlow(
identifier = viewIdentifier,
destinationIdentifier = viewIdentifier,
func = FlowAnalysis.createFlowFunctionFromLogicalPlan(cvc.plan),
@@ -451,7 +451,7 @@ class SqlGraphRegistrationContext(
.identifier
graphRegistrationContext.registerFlow(
- UnresolvedFlow(
+ UntypedFlow(
identifier = flowIdentifier,
destinationIdentifier = qualifiedDestinationIdentifier,
func =
FlowAnalysis.createFlowFunctionFromLogicalPlan(flowQueryLogicalPlan),
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
new file mode 100644
index 000000000000..8d365906559b
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import java.util.Locale
+
+import scala.util.Success
+
+import org.apache.spark.sql.{functions => F, AnalysisException, Column,
QueryTest}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.pipelines.graph.{
+ AutoCdcFlow,
+ AutoCdcMergeFlow,
+ FlowFunction,
+ FlowFunctionResult,
+ Input,
+ QueryContext,
+ QueryOrigin
+}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{DataType, IntegerType, LongType,
StringType, StructField, StructType}
+
+/**
+ * Unit tests for the [[AutoCdcFlow]] and [[AutoCdcMergeFlow]] that do not
execute graph analysis
+ * or execution.
+ */
+class AutoCdcFlowSuite extends QueryTest with SharedSparkSession {
+
+ private val testIdentifier = TableIdentifier("cdc_target", Some("db"))
+
+ /** A no-op [[FlowFunction]] that throws if invoked; AutoCdcFlow tests
should never call it. */
+ private val noOpFlowFunction: FlowFunction = new FlowFunction {
+ override def call(
+ allInputs: Set[TableIdentifier],
+ availableInputs: Seq[Input],
+ configuration: Map[String, String],
+ queryContext: QueryContext,
+ queryOrigin: QueryOrigin): FlowFunctionResult =
+ throw new UnsupportedOperationException(
+ "noOpFlowFunction.call should not be invoked from AutoCdcFlowSuite
tests"
+ )
+ }
+
+ private val testQueryContext =
+ QueryContext(currentCatalog = Some("test_catalog"), currentDatabase =
Some("test_db"))
+
+ private val testChangeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+
+ private def newAutoCdcFlow(
+ identifier: TableIdentifier = testIdentifier,
+ destinationIdentifier: TableIdentifier = testIdentifier,
+ func: FlowFunction = noOpFlowFunction,
+ queryContext: QueryContext = testQueryContext,
+ sqlConf: Map[String, String] = Map.empty,
+ comment: Option[String] = None,
+ origin: QueryOrigin = QueryOrigin.empty,
+ changeArgs: ChangeArgs = testChangeArgs): AutoCdcFlow = {
+ AutoCdcFlow(
+ identifier = identifier,
+ destinationIdentifier = destinationIdentifier,
+ func = func,
+ queryContext = queryContext,
+ sqlConf = sqlConf,
+ comment = comment,
+ origin = origin,
+ changeArgs = changeArgs
+ )
+ }
+
+ test("AutoCdcFlow exposes its constructor fields") {
+ val flow = newAutoCdcFlow(
+ sqlConf = Map("spark.sql.shuffle.partitions" -> "8"),
+ comment = Some("my CDC flow")
+ )
+
+ assert(flow.identifier == testIdentifier)
+ assert(flow.destinationIdentifier == testIdentifier)
+ assert(flow.func eq noOpFlowFunction)
+ assert(flow.queryContext == testQueryContext)
+ assert(flow.sqlConf == Map("spark.sql.shuffle.partitions" -> "8"))
+ assert(flow.comment.contains("my CDC flow"))
+ assert(flow.origin == QueryOrigin.empty)
+ assert(flow.changeArgs == testChangeArgs)
+ }
+
+ test("AutoCdcFlow defaults sqlConf to empty and comment to None") {
+ // Confirms the case-class default values match the documented contract;
downstream
+ // registration code relies on `sqlConf` being a non-null empty map by
default so that
+ // `defaultSqlConf ++ flowDef.sqlConf` is well-defined in
[[GraphRegistrationContext]].
+ val flow = AutoCdcFlow(
+ identifier = testIdentifier,
+ destinationIdentifier = testIdentifier,
+ func = noOpFlowFunction,
+ queryContext = testQueryContext,
+ origin = QueryOrigin.empty,
+ changeArgs = testChangeArgs
+ )
+
+ assert(flow.sqlConf.isEmpty)
+ assert(flow.comment.isEmpty)
+ }
+
+ test("AutoCdcFlow.once is always false") {
+ // AutoCDC flows are streaming-only and must run on every batch trigger,
never as a
+ // one-shot full-refresh-style flow. Locking this in so a future refactor
doesn't
+ // accidentally make `once` configurable.
+
+ // In the future we may intentionally add [[once]] support for AutoCDC
flows, at which point
+ // this test can safely be removed.
+ val flow = newAutoCdcFlow()
+ assert(!flow.once)
+ }
+
+ test("AutoCdcFlow.withSqlConf returns a new instance with the updated
sqlConf") {
+ val original = newAutoCdcFlow(sqlConf = Map("a" -> "1"))
+ val updated = original.withSqlConf(Map("b" -> "2"))
+
+ assert(updated.sqlConf == Map("b" -> "2"))
+ // All other fields should be preserved verbatim.
+ assert(updated.identifier == original.identifier)
+ assert(updated.destinationIdentifier == original.destinationIdentifier)
+ assert(updated.func eq original.func)
+ assert(updated.queryContext == original.queryContext)
+ assert(updated.comment == original.comment)
+ assert(updated.origin == original.origin)
+ assert(updated.changeArgs == original.changeArgs)
+ // The original must not be mutated.
+ assert(original.sqlConf == Map("a" -> "1"))
+ }
+
+ //
===========================================================================================
+ // AutoCdcMergeFlow.schema tests
+ //
===========================================================================================
+
+ /** Materializes a successful [[FlowFunctionResult]] backed by the given
source dataframe. */
+ private def successfulFuncResult(sourceDf: DataFrame): FlowFunctionResult =
+ FlowFunctionResult(
+ requestedInputs = Set.empty,
+ batchInputs = Set.empty,
+ streamingInputs = Set.empty,
+ usedExternalInputs = Set.empty,
+ dataFrame = Success(sourceDf),
+ sqlConf = Map.empty
+ )
+
+ /** Builds a [[AutoCdcMergeFlow]] over the given source dataframe + change
args. */
+ private def newAutoCdcMergeFlow(
+ sourceDf: DataFrame,
+ keys: Seq[UnqualifiedColumnName] = Seq(UnqualifiedColumnName("id")),
+ sequencing: Column = F.col("seq"),
+ storedAsScdType: ScdType = ScdType.Type1,
+ columnSelection: Option[ColumnSelection] = None): AutoCdcMergeFlow = {
+ val flow = newAutoCdcFlow(
+ changeArgs = ChangeArgs(
+ keys = keys,
+ sequencing = sequencing,
+ storedAsScdType = storedAsScdType,
+ columnSelection = columnSelection
+ )
+ )
+ new AutoCdcMergeFlow(flow, successfulFuncResult(sourceDf))
+ }
+
+ /** A stable 3-column source CDF schema used across most schema tests. */
+ private def threeColumnSourceDf(): DataFrame = {
+ val schema = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("name", StringType)
+ .add("seq", LongType)
+
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
schema)
+ }
+
+ /** Convenience to extract the [[StructType]] of the projected
`_cdc_metadata` column. */
+ private def cdcMetadataStruct(schema: StructType): StructType =
+
schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType]
+
+ test(
+ "AutoCdcMergeFlow.schema appends _cdc_metadata to the source schema when
no " +
+ "columnSelection is set"
+ ) {
+ val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
+
+ val expected = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("name", StringType)
+ .add("seq", LongType)
+ .add(
+ StructField(
+ Scd1BatchProcessor.cdcMetadataColName,
+ Scd1BatchProcessor.cdcMetadataColSchema(LongType),
+ nullable = false
+ )
+ )
+ assert(resolvedFlow.schema == expected)
+ }
+
+ test("AutoCdcMergeFlow.schema applies an IncludeColumns selection") {
+ val resolvedFlow = newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ columnSelection = Some(
+ ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("seq"))
+ )
+ )
+ )
+
+ val expected = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("seq", LongType)
+ .add(
+ StructField(
+ Scd1BatchProcessor.cdcMetadataColName,
+ Scd1BatchProcessor.cdcMetadataColSchema(LongType),
+ nullable = false
+ )
+ )
+ assert(resolvedFlow.schema == expected)
+ }
+
+ test("AutoCdcMergeFlow.schema applies an ExcludeColumns selection") {
+ val resolvedFlow = newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ columnSelection = Some(
+ ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name")))
+ )
+ )
+
+ val expected = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("seq", LongType)
+ .add(
+ StructField(
+ Scd1BatchProcessor.cdcMetadataColName,
+ Scd1BatchProcessor.cdcMetadataColSchema(LongType),
+ nullable = false
+ )
+ )
+ assert(resolvedFlow.schema == expected)
+ }
+
+ test(
+ "AutoCdcMergeFlow.schema's _cdc_metadata struct uses the resolved
sequencing data type"
+ ) {
+ // Source has a Long `seq` column; sequencing is `cast(seq as int)`, so
the projected
+ // `_cdc_metadata` fields should be Int (not Long), demonstrating that the
sequencing
+ // expression's *resolved* type drives the metadata schema.
+ val resolvedFlow = newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ sequencing = F.col("seq").cast(IntegerType)
+ )
+
+ val metaStruct = cdcMetadataStruct(resolvedFlow.schema)
+ assert(metaStruct == Scd1BatchProcessor.cdcMetadataColSchema(IntegerType))
+ }
+
+ test("AutoCdcMergeFlow.schema's _cdc_metadata field is non-null with
nullable inner fields") {
+ val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
+
+ val metaField = resolvedFlow.schema(Scd1BatchProcessor.cdcMetadataColName)
+ assert(!metaField.nullable, "_cdc_metadata column itself must be non-null")
+
+ val metaStruct = metaField.dataType.asInstanceOf[StructType]
+ assert(metaStruct(Scd1BatchProcessor.cdcDeleteSequenceFieldName).nullable)
+ assert(metaStruct(Scd1BatchProcessor.cdcUpsertSequenceFieldName).nullable)
+ }
+
+ test("AutoCdcMergeFlow.schema is stable across reads") {
+ // The schema computation calls `df.select(sequencing).schema`, which
triggers Spark
+ // analysis. The eagerly-initialized `val` caches the result so downstream
consumers get
+ // a stable schema instance across reads.
+ val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
+ val first = resolvedFlow.schema
+ val second = resolvedFlow.schema
+ assert(first eq second, "schema should be cached as a val and return the
same instance")
+ }
+
+ test("AutoCdcMergeFlow rejects SCD2 at construction with
AUTOCDC_SCD2_NOT_SUPPORTED") {
+ // Constructing the flow forces the resolved schema, which is unsupported
for SCD2 today.
+ // Failing eagerly (rather than deferring to the first downstream `schema`
read) is the
+ // intended UX -- pipeline graph analysis should not be able to register
an SCD2 AutoCDC
+ // flow at all.
+ checkError(
+ exception = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ storedAsScdType = ScdType.Type2
+ )
+ },
+ condition = "AUTOCDC_SCD2_NOT_SUPPORTED",
+ sqlState = "0A000",
+ parameters = Map.empty
+ )
+ }
+
+ //
===========================================================================================
+ // AutoCdcMergeFlow.load() contract tests
+ //
===========================================================================================
+
+ test("AutoCdcMergeFlow.load() schema matches AutoCdcMergeFlow.schema") {
+ val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
+ val loadedDf = resolvedFlow.load(readOptions = null)
+ assert(loadedDf.schema == resolvedFlow.schema)
+ }
+
+ test("AutoCdcMergeFlow.load() respects an IncludeColumns selection") {
+ val resolvedFlow = newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ columnSelection = Some(
+ ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("seq"))
+ )
+ )
+ )
+ val loadedDf = resolvedFlow.load(readOptions = null)
+ assert(loadedDf.schema == resolvedFlow.schema)
+ // The user-selected portion drops `name`; the trailing column is the SCD1
metadata.
+ assert(
+ loadedDf.schema.fieldNames.toSeq ==
+ Seq("id", "seq", Scd1BatchProcessor.cdcMetadataColName)
+ )
+ }
+
+ test("AutoCdcMergeFlow.load() respects an ExcludeColumns selection") {
+ val resolvedFlow = newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ columnSelection = Some(
+ ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name")))
+ )
+ )
+ val loadedDf = resolvedFlow.load(readOptions = null)
+ assert(loadedDf.schema == resolvedFlow.schema)
+ assert(
+ loadedDf.schema.fieldNames.toSeq ==
+ Seq("id", "seq", Scd1BatchProcessor.cdcMetadataColName)
+ )
+ }
+
+ test("AutoCdcMergeFlow.load() materializes the CDC metadata column as
null-filled") {
+ // The merge engine fills in the metadata at execution time; at planning
time we synthesize
+ // a typed null placeholder so that load().schema matches schema. This
test pins down the
+ // placeholder shape: outer struct non-null, inner fields null-filled.
+ val schema = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("name", StringType)
+ .add("seq", LongType)
+ val sourceRows = java.util.Arrays.asList(
+ org.apache.spark.sql.Row(1, "a", 100L),
+ org.apache.spark.sql.Row(2, "b", 200L)
+ )
+ val sourceDf = spark.createDataFrame(sourceRows, schema)
+ val resolvedFlow = newAutoCdcMergeFlow(sourceDf)
+
+ val loadedDf = resolvedFlow.load(readOptions = null)
+ val collected = loadedDf.collect()
+ assert(collected.length == 2)
+
+ val metaIdx =
loadedDf.schema.fieldIndex(Scd1BatchProcessor.cdcMetadataColName)
+ collected.foreach { row =>
+ assert(!row.isNullAt(metaIdx), "_cdc_metadata struct itself should be
non-null")
+ val metaRow = row.getStruct(metaIdx)
+ assert(metaRow.isNullAt(0), "deleteSequence placeholder should be null")
+ assert(metaRow.isNullAt(1), "upsertSequence placeholder should be null")
+ }
+ }
+
+ //
===========================================================================================
+ // AutoCdcMergeFlow reserved-prefix validation tests
+ //
+ // The two "contract:" tests below lock in the high-level invariant that no
reserved-prefix
+ // column name can be referenced anywhere -- not in the source change-data
feed schema, and
+ // not in user-supplied [[ChangeArgs]] (keys or columnSelection). Together
they ensure that
+ // (a) users cannot opt out of the reserved CDC metadata column by omitting
it from the
+ // selected schema, and (b) users cannot opt in to (or out of) any other
reserved-prefix
+ // name we may reserve in the future for an internal CDC concern.
+ //
+ // The remaining tests pin down case-sensitivity nuances of the
source-schema validator.
+ //
===========================================================================================
+
+ /** Builds an empty source df with `id` + `seq` + the supplied extra
columns. */
+ private def sourceDfWithExtraColumns(extraColumns: (String, DataType)*):
DataFrame = {
+ val schema = extraColumns.foldLeft(
+ new StructType().add("id", IntegerType, nullable = false).add("seq",
LongType)
+ ) { case (acc, (name, dt)) => acc.add(name, dt) }
+
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
schema)
+ }
+
+ test(
+ "Contract: a source df column with the reserved AutoCDC prefix is rejected
at flow " +
+ "construction"
+ ) {
+ val conflictingName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo"
+ val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType)
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(sourceDf)
+ },
+ condition = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT",
+ sqlState = "42710",
+ parameters = Map(
+ "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+ "columnName" -> conflictingName,
+ "schemaName" -> "changeDataFeed",
+ "reservedColumnNamePrefix" ->
Scd1BatchProcessor.reservedColumnNamePrefix
+ )
+ )
+ }
+
+ test(
+ "Contract: ChangeArgs referencing a reserved-prefix column is rejected
even when the " +
+ "source df is clean"
+ ) {
+ // The source df has no reserved-prefix columns, but referencing a
reserved-prefix column
+ // from any ChangeArgs path still fails at construction with a different
error. The
+ // reservation is on the name itself, not on its presence in the source
feed.
+ val cleanSourceDf = threeColumnSourceDf()
+ val reservedName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo"
+
+ val keysEx = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(
+ sourceDf = cleanSourceDf,
+ keys = Seq(UnqualifiedColumnName(reservedName))
+ )
+ }
+ assert(keysEx.getCondition == "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA")
+
+ val includeEx = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(
+ sourceDf = cleanSourceDf,
+ columnSelection = Some(
+ ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("id"),
UnqualifiedColumnName(reservedName))
+ )
+ )
+ )
+ }
+ assert(includeEx.getCondition == "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA")
+
+ val excludeEx = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(
+ sourceDf = cleanSourceDf,
+ columnSelection = Some(
+
ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName(reservedName)))
+ )
+ )
+ }
+ assert(excludeEx.getCondition == "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA")
+ }
+
+ test(
+ "AutoCdcMergeFlow rejects a source df column whose name equals the
reserved CDC " +
+ "metadata column"
+ ) {
+ // Locks in the previous engine-level guard
(Scd1BatchProcessor.extendMicrobatchRowsWith
+ // CdcMetadata) at flow-construction time. Any future regression where a
user-supplied
+ // CDC stream carries the reserved metadata column name should fail
eagerly here.
+ val sourceDf =
sourceDfWithExtraColumns(Scd1BatchProcessor.cdcMetadataColName -> StringType)
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(sourceDf)
+ },
+ condition = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT",
+ sqlState = "42710",
+ parameters = Map(
+ "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+ "columnName" -> Scd1BatchProcessor.cdcMetadataColName,
+ "schemaName" -> "changeDataFeed",
+ "reservedColumnNamePrefix" ->
Scd1BatchProcessor.reservedColumnNamePrefix
+ )
+ )
+ }
+
+ test(
+ "AutoCdcMergeFlow rejects an uppercase reserved-prefix column when
caseSensitive=false"
+ ) {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ val conflictingName =
+
s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo".toUpperCase(Locale.ROOT)
+ val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType)
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(sourceDf)
+ },
+ condition = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT",
+ sqlState = "42710",
+ parameters = Map(
+ "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+ "columnName" -> conflictingName,
+ "schemaName" -> "changeDataFeed",
+ "reservedColumnNamePrefix" ->
Scd1BatchProcessor.reservedColumnNamePrefix
+ )
+ )
+ }
+ }
+
+ test(
+ "AutoCdcMergeFlow allows an uppercase reserved-prefix column when
caseSensitive=true"
+ ) {
+ // Under case-sensitive analysis, the uppercase variant is a distinct
identifier and does
+ // not collide with the lowercase reserved namespace. Locks in that the
validation respects
+ // `spark.sql.caseSensitive`, consistent with the schema-augmentation
logic in this class.
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ val nonConflictingName =
+
s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo".toUpperCase(Locale.ROOT)
+ val sourceDf = sourceDfWithExtraColumns(nonConflictingName -> StringType)
+
+ // No exception expected: construction succeeds.
+ newAutoCdcMergeFlow(sourceDf)
+ }
+ }
+
+ //
===========================================================================================
+ // AutoCdcMergeFlow keys-presence validation tests
(requireKeysPresentInSelectedSchema)
+ //
===========================================================================================
+
+ test("AutoCdcMergeFlow rejects a key that is not present in the source
change-data feed") {
+ // No columnSelection: the post-selection schema equals the source schema.
The key `id`
+ // is absent from the source df entirely, so the validator must surface a
CDC-specific
+ // error rather than deferring to Spark's generic UNRESOLVED_COLUMN.
+ val schema = new StructType()
+ .add("name", StringType)
+ .add("seq", LongType)
+ val sourceDf =
+
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
schema)
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(sourceDf)
+ },
+ condition = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA",
+ sqlState = "22023",
+ parameters = Map(
+ "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+ "keyColumnName" -> "id"
+ )
+ )
+ }
+
+ test("AutoCdcMergeFlow rejects a key dropped by an IncludeColumns
selection") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ columnSelection = Some(
+ ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("seq"))
+ )
+ )
+ )
+ },
+ condition = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA",
+ sqlState = "22023",
+ parameters = Map(
+ "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+ "keyColumnName" -> "id"
+ )
+ )
+ }
+
+ test("AutoCdcMergeFlow rejects a key dropped by an ExcludeColumns
selection") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ columnSelection = Some(
+ ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id")))
+ )
+ )
+ },
+ condition = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA",
+ sqlState = "22023",
+ parameters = Map(
+ "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+ "keyColumnName" -> "id"
+ )
+ )
+ }
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
index c78dc123621b..9432150c4016 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.pipelines.autocdc
-import java.util.Locale
-
import org.apache.spark.sql.{functions => F, AnalysisException, QueryTest, Row}
import org.apache.spark.sql.classic.DataFrame
import org.apache.spark.sql.internal.SQLConf
@@ -671,81 +669,6 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
assert(ex.getCondition == "DATATYPE_MISMATCH.CAST_WITHOUT_SUGGESTION")
}
- test("extendMicrobatchRowsWithCdcMetadata rejects a microbatch that already
contains the " +
- "reserved CDC metadata column") {
- withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
- val schema = new StructType()
- .add("id", IntegerType)
- .add("seq", LongType)
- .add(Scd1BatchProcessor.cdcMetadataColName, StringType)
-
- val batch = microbatchOf(schema)(
- Row(1, 10L, "user-supplied")
- )
-
- val processor = Scd1BatchProcessor(
- changeArgs = ChangeArgs(
- keys = Seq(UnqualifiedColumnName("id")),
- sequencing = F.col("seq"),
- storedAsScdType = ScdType.Type1
- ),
- resolvedSequencingType = LongType
- )
-
- checkError(
- exception = intercept[AnalysisException] {
- processor.extendMicrobatchRowsWithCdcMetadata(batch)
- },
- condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
- sqlState = "42710",
- parameters = Map(
- "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive,
- "columnName" -> Scd1BatchProcessor.cdcMetadataColName,
- "schemaName" -> "microbatch",
- "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
- )
- )
- }
- }
-
- test("extendMicrobatchRowsWithCdcMetadata rejects reserved CDC metadata
column " +
- "case-insensitively") {
- withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
- val conflictingColumnName =
Scd1BatchProcessor.cdcMetadataColName.toUpperCase(Locale.ROOT)
- val schema = new StructType()
- .add("id", IntegerType)
- .add("seq", LongType)
- .add(conflictingColumnName, StringType)
-
- val batch = microbatchOf(schema)(
- Row(1, 10L, "user-supplied")
- )
-
- val processor = Scd1BatchProcessor(
- changeArgs = ChangeArgs(
- keys = Seq(UnqualifiedColumnName("id")),
- sequencing = F.col("seq"),
- storedAsScdType = ScdType.Type1
- ),
- resolvedSequencingType = LongType
- )
-
- checkError(
- exception = intercept[AnalysisException] {
- processor.extendMicrobatchRowsWithCdcMetadata(batch)
- },
- condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
- sqlState = "42710",
- parameters = Map(
- "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
- "columnName" -> conflictingColumnName,
- "schemaName" -> "microbatch",
- "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
- )
- )
- }
- }
-
test("projectTargetColumnsOntoMicrobatch keeps every user column and the CDC
metadata column " +
"when columnSelection is None") {
val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
index f37716b4a24d..f19fed4e5780 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.pipelines.graph
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.pipelines.autocdc.{ChangeArgs, ScdType,
UnqualifiedColumnName}
import org.apache.spark.sql.pipelines.utils.{PipelineTest,
TestGraphRegistrationContext}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -547,4 +548,220 @@ class ConnectInvalidPipelineSuite extends PipelineTest
with SharedSparkSession {
assert(!ex1.getMessage.contains(streamingTableHint))
assert(ex2.getMessage.contains(streamingTableHint))
}
+
+ test(
+ "AutoCDC flow targeting a materialized view fails with " +
+ "STREAMING_RELATION_FOR_MATERIALIZED_VIEW"
+ ) {
+ val session = spark
+ import session.implicits._
+
+ val graph = new TestGraphRegistrationContext(spark) {
+ val cdcEvents = MemoryStream[Int].toDF().select($"value" as "id",
$"value" as "seq")
+ registerTable(
+ Table(
+ identifier = fullyQualifiedIdentifier("target"),
+ comment = None,
+ specifiedSchema = None,
+ partitionCols = None,
+ clusterCols = None,
+ properties = Map.empty,
+ origin = QueryOrigin.empty,
+ format = Some("parquet"),
+ normalizedPath = None,
+ isStreamingTable = false
+ )
+ )
+ registerFlow(
+ AutoCdcFlow(
+ identifier = fullyQualifiedIdentifier("auto_cdc_flow"),
+ destinationIdentifier = fullyQualifiedIdentifier("target"),
+ func = dfFlowFunc(cdcEvents),
+ queryContext = QueryContext(
+ currentCatalog =
Some(TestGraphRegistrationContext.DEFAULT_CATALOG),
+ currentDatabase =
Some(TestGraphRegistrationContext.DEFAULT_DATABASE)
+ ),
+ origin = QueryOrigin.empty,
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = $"seq",
+ storedAsScdType = ScdType.Type1
+ )
+ )
+ )
+ }.resolveToDataflowGraph()
+
+ val ex = intercept[AnalysisException] {
+ graph.validate()
+ }
+
+ checkError(
+ exception = ex,
+ condition =
"INVALID_FLOW_QUERY_TYPE.STREAMING_RELATION_FOR_MATERIALIZED_VIEW",
+ parameters = Map(
+ "flowIdentifier" ->
fullyQualifiedIdentifier("auto_cdc_flow").quotedString,
+ "tableIdentifier" -> fullyQualifiedIdentifier("target").quotedString
+ )
+ )
+ }
+
+ test(
+ "AutoCDC flow targeting a persisted view fails with
STREAMING_RELATION_FOR_PERSISTED_VIEW"
+ ) {
+ val session = spark
+ import session.implicits._
+
+ val graph = new TestGraphRegistrationContext(spark) {
+ val cdcEvents = MemoryStream[Int].toDF().select($"value" as "id",
$"value" as "seq")
+ registerView(
+ PersistedView(
+ identifier = fullyQualifiedIdentifier("target_view"),
+ properties = Map.empty,
+ sqlText = None,
+ comment = None,
+ origin = QueryOrigin.empty
+ )
+ )
+ registerFlow(
+ AutoCdcFlow(
+ identifier = fullyQualifiedIdentifier("target_view"),
+ destinationIdentifier = fullyQualifiedIdentifier("target_view"),
+ func = dfFlowFunc(cdcEvents),
+ queryContext = QueryContext(
+ currentCatalog =
Some(TestGraphRegistrationContext.DEFAULT_CATALOG),
+ currentDatabase =
Some(TestGraphRegistrationContext.DEFAULT_DATABASE)
+ ),
+ origin = QueryOrigin.empty,
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = $"seq",
+ storedAsScdType = ScdType.Type1
+ )
+ )
+ )
+ }.resolveToDataflowGraph()
+
+ val ex = intercept[AnalysisException] {
+ graph.validate()
+ }
+
+ checkError(
+ exception = ex,
+ condition =
"INVALID_FLOW_QUERY_TYPE.STREAMING_RELATION_FOR_PERSISTED_VIEW",
+ parameters = Map(
+ "flowIdentifier" ->
fullyQualifiedIdentifier("target_view").quotedString,
+ "viewIdentifier" ->
fullyQualifiedIdentifier("target_view").quotedString
+ )
+ )
+ }
+
+ test(
+ "AutoCDC flow targeting a temporary view fails with
AUTOCDC_RELATION_FOR_TEMPORARY_VIEW"
+ ) {
+ // Temporary views in SDP normally accept either streaming or batch
producing flows, but
+ // AutoCDC flows are an explicit exception: SCD reconciliation only runs
at the
+ // streaming-table sink (`Scd1ForeachBatchExec`), so pointing an AutoCDC
flow at a view
+ // would silently drop reconciliation and expose just the projected CDF to
consumers.
+ // `validateFlowStreamingness` rejects this case with a dedicated
sub-condition under
+ // INVALID_FLOW_QUERY_TYPE.
+ val session = spark
+ import session.implicits._
+
+ val graph = new TestGraphRegistrationContext(spark) {
+ val cdcEvents = MemoryStream[Int].toDF().select($"value" as "id",
$"value" as "seq")
+ // A pipeline must contain at least one non-temporary dataset; register
an unrelated
+ // streaming table so the pipeline is non-empty and we can exercise the
AutoCDC path.
+ registerTable(
+ "dummy_table",
+ query = Some(dfFlowFunc(MemoryStream[Int].toDF()))
+ )
+ registerView(
+ TemporaryView(
+ identifier = fullyQualifiedIdentifier("target_view"),
+ properties = Map.empty,
+ sqlText = None,
+ comment = None,
+ origin = QueryOrigin.empty
+ )
+ )
+ registerFlow(
+ AutoCdcFlow(
+ identifier = fullyQualifiedIdentifier("target_view"),
+ destinationIdentifier = fullyQualifiedIdentifier("target_view"),
+ func = dfFlowFunc(cdcEvents),
+ queryContext = QueryContext(
+ currentCatalog =
Some(TestGraphRegistrationContext.DEFAULT_CATALOG),
+ currentDatabase =
Some(TestGraphRegistrationContext.DEFAULT_DATABASE)
+ ),
+ origin = QueryOrigin.empty,
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = $"seq",
+ storedAsScdType = ScdType.Type1
+ )
+ )
+ )
+ }.resolveToDataflowGraph()
+
+ val ex = intercept[AnalysisException] {
+ graph.validate()
+ }
+
+ checkError(
+ exception = ex,
+ condition =
"INVALID_FLOW_QUERY_TYPE.AUTOCDC_RELATION_FOR_TEMPORARY_VIEW",
+ parameters = Map(
+ "flowIdentifier" ->
fullyQualifiedIdentifier("target_view").quotedString,
+ "viewIdentifier" ->
fullyQualifiedIdentifier("target_view").quotedString
+ )
+ )
+ }
+
+ test("A multiquery table cannot have an AutoCDC query input") {
+ val session = spark
+ import session.implicits._
+
+ val graph = new TestGraphRegistrationContext(spark) {
+ val cdcEvents = MemoryStream[Int].toDF().select($"value" as "id",
$"value" as "seq")
+ registerTable("target")
+ registerFlow(
+ AutoCdcFlow(
+ identifier = fullyQualifiedIdentifier("auto_cdc_flow"),
+ destinationIdentifier = fullyQualifiedIdentifier("target"),
+ func = dfFlowFunc(cdcEvents),
+ queryContext = QueryContext(
+ currentCatalog =
Some(TestGraphRegistrationContext.DEFAULT_CATALOG),
+ currentDatabase =
Some(TestGraphRegistrationContext.DEFAULT_DATABASE)
+ ),
+ origin = QueryOrigin.empty,
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = $"seq",
+ storedAsScdType = ScdType.Type1
+ )
+ )
+ )
+ registerFlow(
+ destinationName = "target",
+ name = "extra_flow",
+ query = dfFlowFunc(MemoryStream[Int].toDF().select($"value" as "id",
$"value" as "seq"))
+ )
+ }.resolveToDataflowGraph()
+
+ val ex = intercept[AnalysisException] {
+ graph.validate()
+ }
+
+ checkError(
+ exception = ex,
+ condition = "AUTOCDC_MULTIPLE_FLOWS_TO_TARGET",
+ parameters = Map(
+ "tableName" -> fullyQualifiedIdentifier("target").unquotedString,
+ "flows" -> Seq(
+ fullyQualifiedIdentifier("auto_cdc_flow").unquotedString,
+ fullyQualifiedIdentifier("extra_flow").unquotedString
+ ).sorted.mkString(", ")
+ )
+ )
+ }
}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
index 3ac3c0901750..3c7db2cca889 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.Union
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.pipelines.autocdc.{ChangeArgs, ScdType,
UnqualifiedColumnName}
import org.apache.spark.sql.pipelines.utils.{PipelineTest,
TestGraphRegistrationContext}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -509,6 +510,38 @@ class ConnectValidPipelineSuite extends PipelineTest with
SharedSparkSession {
assert(g.flow(TableIdentifier("sink_flow")).isInstanceOf[StreamingFlow])
}
+ test("AutoCdcFlow registers and resolves to AutoCdcMergeFlow") {
+ val session = spark
+ import session.implicits._
+
+ val P = new TestGraphRegistrationContext(spark) {
+ val mem = MemoryStream[Int]
+ val cdcEvents = mem.toDF().select($"value" as "id", $"value" as "seq")
+ registerTable("target")
+ registerFlow(
+ AutoCdcFlow(
+ identifier = fullyQualifiedIdentifier("auto_cdc_flow"),
+ destinationIdentifier = fullyQualifiedIdentifier("target"),
+ func = dfFlowFunc(cdcEvents),
+ queryContext = QueryContext(
+ currentCatalog =
Some(TestGraphRegistrationContext.DEFAULT_CATALOG),
+ currentDatabase =
Some(TestGraphRegistrationContext.DEFAULT_DATABASE)
+ ),
+ origin = QueryOrigin.empty,
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = $"seq",
+ storedAsScdType = ScdType.Type1
+ )
+ )
+ )
+ }
+ val g = P.resolveToDataflowGraph()
+ assert(
+
g.flow(fullyQualifiedIdentifier("auto_cdc_flow")).isInstanceOf[AutoCdcMergeFlow]
+ )
+ }
+
/** Verifies the [[DataflowGraph]] has the specified [[Flow]] with the
specified schema. */
private def verifyFlowSchema(
pipeline: DataflowGraph,
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala
index 9ff92ee895b1..f5bdf87a6cc6 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{LocalTempView, PersistedView =>
PersistedViewType, UnresolvedRelation, ViewType}
import org.apache.spark.sql.classic.{DataFrame, SparkSession}
-import org.apache.spark.sql.pipelines.graph.{DataflowGraph, FlowAnalysis,
FlowFunction, GraphIdentifierManager, GraphRegistrationContext, PersistedView,
QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, Table,
TemporaryView, UnresolvedFlow}
+import org.apache.spark.sql.pipelines.graph.{DataflowGraph, FlowAnalysis,
FlowFunction, GraphIdentifierManager, GraphRegistrationContext, PersistedView,
QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, Table,
TemporaryView, UntypedFlow}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -176,7 +176,7 @@ class TestGraphRegistrationContext(
if (query.isDefined) {
registerFlow(
- new UnresolvedFlow(
+ UntypedFlow(
identifier = qualifiedIdentifier,
destinationIdentifier = qualifiedIdentifier,
func = query.get,
@@ -267,7 +267,7 @@ class TestGraphRegistrationContext(
)
registerFlow(
- new UnresolvedFlow(
+ UntypedFlow(
identifier = viewIdentifier,
destinationIdentifier = viewIdentifier,
func = query,
@@ -339,7 +339,7 @@ class TestGraphRegistrationContext(
}
registerFlow(
- new UnresolvedFlow(
+ UntypedFlow(
identifier = flowIdentifier,
destinationIdentifier = flowDestinationIdentifier,
func = query,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]