This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 49aaed209dc8 [SPARK-56956][SDP] Introduce AutoCDC Flow Dataclasses
49aaed209dc8 is described below

commit 49aaed209dc884f4022bf7f507ce580ea6b4225d
Author: AnishMahto <[email protected]>
AuthorDate: Tue May 26 15:30:16 2026 +0800

    [SPARK-56956][SDP] Introduce AutoCDC Flow Dataclasses
    
    Approved AutoCDC SPIP: 
https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
    
    --------
    
    ### What changes were proposed in this pull request?
    Introduce dataclass for unresolved AutoCDC flow (`AutoCdcFlow`) and 
resolved AutoCDC flow (`AutoCdcMergeFlow`). Add wiring to analyze an 
`AutoCdcFlow` to an `AutoCdcMergeFlow`.
    
    A small refactor was additionally made on the `UnresolvedFlow` and 
`ResolvedFlow` class hierarchy.
    
    ### Why are the changes needed?
    Support AutoCDC flow registration and analysis. AutoCDC flow execution will 
be supported in a future PR. Previously, an `UnresolvedFlow` additionally 
always represented an untyped-flow; a flow where do not yet know its 
execution-type, i.e streaming, append-once, etc.
    
    `AutoCdcFlow` is a specialized flow with support for only streaming flows, 
hence it represents a flow whose execution-type we know at construction. It is 
still unresolved at registration time, and needs to go through resolution to 
determine its position in the DAG and its input/output schemas.
    
    Hence we introduce the intermediary child `UntypedFlow` for 
`UnresolvedFlow`, which all previous flows are classified as during 
registration. An `AutoCdcFlow` directly implements `UnresolvedFlow` (skipping 
`UntypedFlow in its inheritance chain) because it is not untyped.
    
    ### Does this PR introduce _any_ user-facing change?
    No, the AutoCDC feature is not released anywhere yet.
    
    ### How was this patch tested?
    `ConnectValidPipelineSuite` and `AutoCdcFlowSuite`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Co-authored.
    
    Generated-by: Claude-Opus-4.7-thinking-xhigh
    
    Closes #56042 from AnishMahto/SPARK-56956-introduce-flow-data-classes.
    
    Authored-by: AnishMahto <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  27 +-
 .../sql/connect/pipelines/PipelinesHandler.scala   |   4 +-
 .../spark/sql/pipelines/autocdc/ChangeArgs.scala   |   2 +-
 .../sql/pipelines/autocdc/Scd1BatchProcessor.scala |  16 +-
 .../graph/CoreDataflowNodeProcessor.scala          |  15 +-
 .../apache/spark/sql/pipelines/graph/Flow.scala    | 192 ++++++-
 .../pipelines/graph/GraphRegistrationContext.scala |   2 +-
 .../sql/pipelines/graph/GraphValidations.scala     |  41 +-
 .../graph/SqlGraphRegistrationContext.scala        |  10 +-
 .../sql/pipelines/autocdc/AutoCdcFlowSuite.scala   | 601 +++++++++++++++++++++
 .../autocdc/Scd1BatchProcessorSuite.scala          |  77 ---
 .../graph/ConnectInvalidPipelineSuite.scala        | 217 ++++++++
 .../graph/ConnectValidPipelineSuite.scala          |  33 ++
 .../utils/TestGraphRegistrationContext.scala       |   8 +-
 14 files changed, 1140 insertions(+), 105 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index d907d027b0ed..9c9a657bc6e9 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -203,6 +203,12 @@
     ],
     "sqlState" : "22023"
   },
+  "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA" : {
+    "message" : [
+      "Using <caseSensitivity> column name comparison, the AutoCDC key column 
`<keyColumnName>` is not present in the flow's selected source schema. AutoCDC 
requires every key column to be present in the source change-data feed and 
retained by any configured column selection."
+    ],
+    "sqlState" : "22023"
+  },
   "AUTOCDC_MICROBATCH_VALIDATION" : {
     "message" : [
       "AutoCDC flow on table <tableName> in batch <batchId> failed microbatch 
validation."
@@ -232,12 +238,24 @@
     ],
     "sqlState" : "42703"
   },
-  "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT" : {
+  "AUTOCDC_MULTIPLE_FLOWS_TO_TARGET" : {
+    "message" : [
+      "Invalid AutoCDC destination <tableName> with multiple flows: <flows>. 
An AutoCDC target table must have exactly one flow writing to it."
+    ],
+    "sqlState" : "42000"
+  },
+  "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT" : {
     "message" : [
-      "Using <caseSensitivity> column name comparison, the column 
`<columnName>` in the <schemaName> schema conflicts with the reserved AutoCDC 
column name `<reservedColumnName>`. Rename or remove the column."
+      "The column `<columnName>` in the <schemaName> schema collides with the 
reserved AutoCDC column name prefix `<reservedColumnNamePrefix>` (using 
<caseSensitivity> column name comparison). Rename or remove the column."
     ],
     "sqlState" : "42710"
   },
+  "AUTOCDC_SCD2_NOT_SUPPORTED" : {
+    "message" : [
+      "AutoCDC flows do not currently support SCD Type 2 transformations."
+    ],
+    "sqlState" : "0A000"
+  },
   "AVRO_CANNOT_WRITE_NULL_FIELD" : {
     "message" : [
       "Cannot write null value for field <name> defined as non-null Avro data 
type <dataType>.",
@@ -3710,6 +3728,11 @@
       "Flow <flowIdentifier> returns an invalid relation type."
     ],
     "subClass" : {
+      "AUTOCDC_RELATION_FOR_TEMPORARY_VIEW" : {
+        "message" : [
+          "AutoCDC flows must target a streaming table because their 
reconciliation semantics require a streaming-table sink, but the flow 
<flowIdentifier> attempts to write an AutoCDC relation to the temporary view 
<viewIdentifier>."
+        ]
+      },
       "BATCH_RELATION_FOR_STREAMING_TABLE" : {
         "message" : [
           "Streaming tables may only be defined by streaming relations, but 
the flow <flowIdentifier> attempts to write a batch relation to the streaming 
table <tableIdentifier>. Consider using the STREAM operator in Spark-SQL to 
convert the batch relation into a streaming relation, or populating the 
streaming table with an append once-flow instead."
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
index ee19b83c5162..04dbc1a45506 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelinesHandler.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.connect.service.SessionHolder
 import org.apache.spark.sql.execution.command.{ShowCatalogsCommand, 
ShowNamespacesCommand}
 import org.apache.spark.sql.pipelines.Language.Python
 import org.apache.spark.sql.pipelines.common.RunState.{CANCELED, FAILED}
-import org.apache.spark.sql.pipelines.graph.{AllTables, FlowAnalysis, 
GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables, 
PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink, 
SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter, 
TemporaryView, UnresolvedFlow}
+import org.apache.spark.sql.pipelines.graph.{AllTables, FlowAnalysis, 
GraphIdentifierManager, GraphRegistrationContext, IdentifierHelper, NoTables, 
PipelineUpdateContextImpl, QueryContext, QueryOrigin, QueryOriginType, Sink, 
SinkImpl, SomeTables, SqlGraphRegistrationContext, Table, TableFilter, 
TemporaryView, UntypedFlow}
 import org.apache.spark.sql.pipelines.logging.{PipelineEvent, RunProgress}
 import org.apache.spark.sql.types.StructType
 
@@ -371,7 +371,7 @@ private[connect] object PipelinesHandler extends Logging {
       case proto.PipelineCommand.DefineFlow.DetailsCase.RELATION_FLOW_DETAILS 
=>
         val relationFlowDetails = flow.getRelationFlowDetails
         graphElementRegistry.registerFlow(
-          UnresolvedFlow(
+          UntypedFlow(
             identifier = flowIdentifier,
             destinationIdentifier = destinationIdentifier,
             func = FlowAnalysis.createFlowFunctionFromLogicalPlan(
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
index c17c89967baa..b975e06807f5 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
@@ -120,7 +120,7 @@ object ColumnSelection {
 }
 
 /** User-facing case-sensitivity labels surfaced in AutoCDC error messages. */
-private[autocdc] object CaseSensitivityLabels {
+private[pipelines] object CaseSensitivityLabels {
   val CaseSensitive: String = "case-sensitive"
   val CaseInsensitive: String = "case-insensitive"
 
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index 32aebc8924c0..aaea3b63e9ef 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -416,9 +416,15 @@ case class Scd1BatchProcessor(
 }
 
 object Scd1BatchProcessor {
-  // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP 
AutoCDC processing.
-  private[autocdc] val winningRowColName: String = 
"__spark_autocdc_winning_row"
-  private[autocdc] val cdcMetadataColName: String = "__spark_autocdc_metadata"
+  /**
+   * Reserved column-name prefix for internal SDP AutoCDC processing. Source 
change-data-feed
+   * dataframes must not contain any columns starting with this prefix; the 
invariant is
+   * enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]] 
construction.
+   */
+  private[pipelines] val reservedColumnNamePrefix: String = "__spark_autocdc_"
+
+  private[autocdc] val winningRowColName: String = 
s"${reservedColumnNamePrefix}winning_row"
+  private[pipelines] val cdcMetadataColName: String = 
s"${reservedColumnNamePrefix}metadata"
 
   private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence"
   private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence"
@@ -434,7 +440,7 @@ object Scd1BatchProcessor {
   /**
    * Schema of the CDC metadata struct column for SCD1.
    */
-  private def cdcMetadataColSchema(sequencingType: DataType): StructType =
+  private[pipelines] def cdcMetadataColSchema(sequencingType: DataType): 
StructType =
     StructType(
       Seq(
         // The sequencing of the event if it represents a delete, null 
otherwise.
@@ -448,7 +454,7 @@ object Scd1BatchProcessor {
    * Construct the CDC metadata struct column for SCD1, following the exact 
schema and field
    * ordering defined by [[cdcMetadataColSchema]].
    */
-  private[autocdc] def constructCdcMetadataCol(
+  private[pipelines] def constructCdcMetadataCol(
       deleteSequence: Column,
       upsertSequence: Column,
       sequencingType: DataType): Column = {
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
index 38fde0bfec4a..66f2995ee02d 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala
@@ -176,7 +176,7 @@ private class FlowResolver(rawGraph: DataflowGraph) {
           } else {
             f
           }
-          convertResolvedToTypedFlow(flowToResolve, maybeNewFuncResult)
+          resolveFlow(flowToResolve, maybeNewFuncResult)
 
         // If the flow failed due to an UnresolvedDatasetException, it means 
that one of the
         // flow's inputs wasn't available. After other flows are resolved, 
these inputs
@@ -199,9 +199,18 @@ private class FlowResolver(rawGraph: DataflowGraph) {
       }
   }
 
-  private def convertResolvedToTypedFlow(
+  private def resolveFlow(
       flow: UnresolvedFlow,
       funcResult: FlowFunctionResult): ResolvedFlow = {
+    flow match {
+      case acf: AutoCdcFlow => new AutoCdcMergeFlow(acf, funcResult)
+      case utf: UntypedFlow => transformUntypedFlowToResolvedFlow(utf, 
funcResult)
+    }
+  }
+
+  private def transformUntypedFlowToResolvedFlow(
+      flow: UntypedFlow,
+      funcResult: FlowFunctionResult): ResolvedFlow = {
     flow match {
       case _ if flow.once => new AppendOnceFlow(flow, funcResult)
       case _ if funcResult.dataFrame.get.isStreaming =>
@@ -210,7 +219,7 @@ private class FlowResolver(rawGraph: DataflowGraph) {
         // then get their results overwritten.
         val mustBeAppend = rawGraph.flowsTo(flow.destinationIdentifier).size > 
1
         new StreamingFlow(flow, funcResult, mustBeAppend = mustBeAppend)
-      case _: UnresolvedFlow => new CompleteFlow(flow, funcResult)
+      case _ => new CompleteFlow(flow, funcResult)
     }
   }
 }
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
index e329308502f0..04ef8d3186c5 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
@@ -20,11 +20,19 @@ package org.apache.spark.sql.pipelines.graph
 import scala.util.Try
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{functions => F, AnalysisException, Column}
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
 import org.apache.spark.sql.classic.DataFrame
 import org.apache.spark.sql.pipelines.AnalysisWarning
+import org.apache.spark.sql.pipelines.autocdc.{
+  CaseSensitivityLabels,
+  ChangeArgs,
+  ColumnSelection,
+  Scd1BatchProcessor,
+  ScdType
+}
 import org.apache.spark.sql.pipelines.util.InputReadOptions
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
 
 /**
  * Contains the catalog and database context information for query execution.
@@ -121,7 +129,21 @@ case class FlowFunctionResult(
 }
 
 /** A [[Flow]] whose output schema and dependencies aren't known. */
-case class UnresolvedFlow(
+sealed trait UnresolvedFlow extends Flow {
+  /** Returns a copy of this flow with the given SQL confs overriding the 
existing ones. */
+  def withSqlConf(newSqlConf: Map[String, String]): UnresolvedFlow
+}
+
+/**
+ * An [[UnresolvedFlow]] whose execution-type has not yet been determined.
+ *
+ * In some cases, we know the execution-type for an [[UnresolvedFlow]] even 
before flow analysis
+ * and resolution. For example an AutoCDCFlow is a special 
unresolved-but-typed flow; we know a
+ * flow will be an AutoCDC flow immediately on construction, because it has 
its own special
+ * registration API. Such flows are considered "typed flows", but there isn't 
any semantic reason
+ * yet to explicitly introduce a `TypedFlow` trait/class.
+ */
+case class UntypedFlow(
     identifier: TableIdentifier,
     destinationIdentifier: TableIdentifier,
     func: FlowFunction,
@@ -129,7 +151,34 @@ case class UnresolvedFlow(
     sqlConf: Map[String, String],
     override val once: Boolean,
     override val origin: QueryOrigin
-) extends Flow
+) extends UnresolvedFlow {
+  override def withSqlConf(newSqlConf: Map[String, String]): UntypedFlow =
+    copy(sqlConf = newSqlConf)
+}
+
+/**
+ * An unresolved but typed flow that applies a CDC event stream to a target 
table via MERGE.
+ *
+ * [[AutoCdcFlow]] is a typed flow because it is only supported for streaming, 
and not as a once
+ * flow. Therefore by definition it is a streaming-type flow.
+ *
+ * In the future once-support for [[AutoCdcFlow]] may be added.
+ */
+case class AutoCdcFlow(
+    identifier: TableIdentifier,
+    destinationIdentifier: TableIdentifier,
+    func: FlowFunction,
+    queryContext: QueryContext,
+    sqlConf: Map[String, String] = Map.empty,
+    comment: Option[String] = None,
+    override val origin: QueryOrigin,
+    changeArgs: ChangeArgs
+) extends UnresolvedFlow {
+  override val once: Boolean = false
+
+  override def withSqlConf(newSqlConf: Map[String, String]): AutoCdcFlow =
+    copy(sqlConf = newSqlConf)
+}
 
 /**
  * A [[Flow]] whose flow function has been invoked, meaning either:
@@ -194,3 +243,140 @@ class AppendOnceFlow(
 
   override val once = true
 }
+
+/**
+ * A resolved flow that applies a CDC event stream to a target table via 
MERGE, in accordance to
+ * the configured [[flow.changeArgs]].
+ */
+class AutoCdcMergeFlow(
+    val flow: AutoCdcFlow,
+    val funcResult: FlowFunctionResult
+) extends ResolvedFlow {
+  requireReservedPrefixAbsentInSourceColumns()
+
+  def changeArgs: ChangeArgs = flow.changeArgs
+
+  /** The user-selected projection of [[df.schema]] (i.e. before the SCD 
metadata column). */
+  private val userSelectedSchema: StructType = {
+    val selectedSchema = ColumnSelection.applyToSchema(
+      schemaName = "changeDataFeed",
+      schema = df.schema,
+      columnSelection = changeArgs.columnSelection,
+      caseSensitive = spark.sessionState.conf.caseSensitiveAnalysis
+    )
+    // AutoCDC flows require all key columns to be present in the target 
table, to adhere to SCD
+    // semantics.
+    requireKeysPresentInSelectedSchema(selectedSchema)
+    selectedSchema
+  }
+
+  /** The DataType of the sequencing expression, derived once from the source 
change feed. */
+  private val sequencingType: DataType =
+    df.select(changeArgs.sequencing).schema.head.dataType
+
+  /**
+   * Returns the augmented output schema of this flow, which can differ from 
the schema of the
+   * source change-data-feed dataframe.
+   *
+   * The source dataframe's schema describes the incoming CDC events; the 
augmented schema here
+   * applies the user-specified [[ColumnSelection]] and appends the 
SCD-specific metadata
+   * columns that the AutoCDC MERGE engine projects onto the target table. 
Downstream
+   * dependencies in the pipeline see this augmented schema.
+   */
+  override val schema: StructType = changeArgs.storedAsScdType match {
+    case ScdType.Type1 =>
+      // SCD1 produces a target table with all the user-selected output 
columns and a projected
+      // CDC operational metadata column at the end.
+      StructType(
+        userSelectedSchema.fields :+ StructField(
+          Scd1BatchProcessor.cdcMetadataColName,
+          Scd1BatchProcessor.cdcMetadataColSchema(sequencingType),
+          nullable = false
+        )
+      )
+    case ScdType.Type2 =>
+      throw new AnalysisException(
+        errorClass = "AUTOCDC_SCD2_NOT_SUPPORTED",
+        messageParameters = Map.empty
+      )
+  }
+
+  /**
+   * Returns an empty dataframe whose schema matches 
[[AutoCdcMergeFlow.schema]].
+   *
+   * Today, [[AutoCdcMergeFlow.load]] is not actually ever called during graph 
analysis or
+   * execution. An AutoCdcMergeFlow can only be an input to a streaming table 
(not an MV or
+   * persisted/temp view), and streaming tables take a [[VirtualTableInput]] 
as input, not
+   * the producing [[Flow]] directly. [[VirtualTableInput]] overrides its own 
[[load]] to do
+   * schema inference on its input flows, rather than a transitive 
[[Flow.load]].
+   *
+   * The [[AutoCdcMergeFlow.load]] implementation exists solely for API 
consistency.
+   */
+  override def load(readOptions: InputReadOptions): DataFrame = 
changeArgs.storedAsScdType match {
+    case ScdType.Type1 =>
+      val userSelectedCols: Seq[Column] = 
userSelectedSchema.fieldNames.toSeq.map(F.col)
+      val emptyCdcMetadataCol: Column = 
Scd1BatchProcessor.constructCdcMetadataCol(
+        deleteSequence = F.lit(null),
+        upsertSequence = F.lit(null),
+        sequencingType = sequencingType
+      ).as(Scd1BatchProcessor.cdcMetadataColName)
+
+      df.select(userSelectedCols :+ emptyCdcMetadataCol: _*)
+    case ScdType.Type2 =>
+      throw new AnalysisException(
+        errorClass = "AUTOCDC_SCD2_NOT_SUPPORTED",
+        messageParameters = Map.empty
+      )
+  }
+
+  /**
+   * Validate that the resolved source dataframe for the AutoCDC flow does not 
contain any column
+   * names that use the reserved Spark AutoCDC prefix.
+   */
+  private def requireReservedPrefixAbsentInSourceColumns(): Unit = {
+    val resolver = spark.sessionState.conf.resolver
+    val reservedPrefix = Scd1BatchProcessor.reservedColumnNamePrefix
+
+    def nameContainsReservedPrefix(name: String): Boolean = {
+      name.length >= reservedPrefix.length && resolver(
+        name.substring(0, reservedPrefix.length),
+        reservedPrefix
+      )
+    }
+
+    df.schema.fieldNames.find(nameContainsReservedPrefix).foreach { 
conflictingColumnName =>
+      throw new AnalysisException(
+        errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT",
+        messageParameters = Map(
+          "caseSensitivity" -> CaseSensitivityLabels.of(
+            spark.sessionState.conf.caseSensitiveAnalysis
+          ),
+          "columnName" -> conflictingColumnName,
+          "schemaName" -> "changeDataFeed",
+          "reservedColumnNamePrefix" -> reservedPrefix
+        )
+      )
+    }
+  }
+
+  /**
+   * Validate all keys specified in changeArgs are actually present in the 
user-selected schema.
+   */
+  private def requireKeysPresentInSelectedSchema(selectedSchema: StructType): 
Unit = {
+    val resolver = spark.sessionState.conf.resolver
+
+    changeArgs.keys
+      .find(key => !selectedSchema.fieldNames.exists(name => resolver(name, 
key.name)))
+      .foreach { missingKey =>
+        throw new AnalysisException(
+          errorClass = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA",
+          messageParameters = Map(
+            "caseSensitivity" -> CaseSensitivityLabels.of(
+              spark.sessionState.conf.caseSensitiveAnalysis
+            ),
+            "keyColumnName" -> missingKey.name
+          )
+        )
+      }
+  }
+}
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala
index dadda0561b19..970fdb4b70e9 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala
@@ -59,7 +59,7 @@ class GraphRegistrationContext(
   }
 
   def registerFlow(flowDef: UnresolvedFlow): Unit = {
-    flows += flowDef.copy(sqlConf = defaultSqlConf ++ flowDef.sqlConf)
+    flows += flowDef.withSqlConf(defaultSqlConf ++ flowDef.sqlConf)
   }
 
   private def isEmpty: Boolean = {
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
index ade6ce0bad3c..e5ad3de44a8a 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala
@@ -34,6 +34,24 @@ trait GraphValidations extends Logging {
    */
   protected[pipelines] def validateMultiQueryTables(): Map[TableIdentifier, 
Seq[Flow]] = {
     val multiQueryTables = flowsTo.filter(_._2.size > 1)
+
+    // A multiflow table may not have an AutoCDC flow; AutoCDC flow targets 
must be single query.
+    multiQueryTables
+      .find { case (_, flows) => flows.exists(isAutoCdcFlow) }
+      .foreach {
+        case (dest, flows) =>
+          throw new AnalysisException(
+            "AUTOCDC_MULTIPLE_FLOWS_TO_TARGET",
+            Map(
+              "tableName" -> dest.unquotedString,
+              "flows" -> flows
+                .map(_.displayName)
+                .sorted
+                .mkString(", ")
+            )
+          )
+      }
+
     // Non-streaming tables do not support multiflow.
     multiQueryTables
       .find {
@@ -58,6 +76,12 @@ trait GraphValidations extends Logging {
     multiQueryTables
   }
 
+  /** Returns true iff the given flow is an [[AutoCdcFlow]] (resolved or not). 
*/
+  private def isAutoCdcFlow(f: Flow): Boolean = f match {
+    case _: AutoCdcFlow | _: AutoCdcMergeFlow => true
+    case _ => false
+  }
+
   /**
    * Validate that each resolved flow is correctly either a streaming flow or 
non-streaming flow,
    * depending on the flow type (ex. once flow vs non-once flow) and the 
dataset type the flow
@@ -126,8 +150,21 @@ trait GraphValidations extends Logging {
                 )
               }
             case _: TemporaryView =>
-              // Temporary views' flows are allowed to be either streaming or 
batch, so no
-              // validation needs to be done for them
+              // Temporary views' flows are generally allowed to be either 
streaming or batch.
+              resolvedFlow match {
+                case _: AutoCdcMergeFlow =>
+                  // The exception is AutoCDC flows, which require a 
streaming-table sink to
+                  // immediately execute MERGE against.
+                  throw new AnalysisException(
+                    errorClass =
+                      
"INVALID_FLOW_QUERY_TYPE.AUTOCDC_RELATION_FOR_TEMPORARY_VIEW",
+                    messageParameters = Map(
+                      "flowIdentifier" -> resolvedFlow.identifier.quotedString,
+                      "viewIdentifier" -> destTableIdentifier.quotedString
+                    )
+                  )
+                case _ => // OK: any other flow is permitted to target a 
temporary view.
+              }
           }
         }
       }
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala
index 829179142dc5..4dfd09693578 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SqlGraphRegistrationContext.scala
@@ -237,7 +237,7 @@ class SqlGraphRegistrationContext(
 
       // Register flow that backs this streaming table.
       graphRegistrationContext.registerFlow(
-        UnresolvedFlow(
+        UntypedFlow(
           identifier = stIdentifier,
           destinationIdentifier = stIdentifier,
           func = FlowAnalysis.createFlowFunctionFromLogicalPlan(cst.query),
@@ -288,7 +288,7 @@ class SqlGraphRegistrationContext(
 
       // Register flow that backs this materialized view.
       graphRegistrationContext.registerFlow(
-        UnresolvedFlow(
+        UntypedFlow(
           identifier = mvIdentifier,
           destinationIdentifier = mvIdentifier,
           func = FlowAnalysis.createFlowFunctionFromLogicalPlan(cmv.query),
@@ -331,7 +331,7 @@ class SqlGraphRegistrationContext(
 
       // Register flow that backs this persisted view.
       graphRegistrationContext.registerFlow(
-        UnresolvedFlow(
+        UntypedFlow(
           identifier = viewIdentifier,
           destinationIdentifier = viewIdentifier,
           func = FlowAnalysis.createFlowFunctionFromLogicalPlan(cv.query),
@@ -375,7 +375,7 @@ class SqlGraphRegistrationContext(
 
       // Register flow definition that backs this temporary view.
       graphRegistrationContext.registerFlow(
-        UnresolvedFlow(
+        UntypedFlow(
           identifier = viewIdentifier,
           destinationIdentifier = viewIdentifier,
           func = FlowAnalysis.createFlowFunctionFromLogicalPlan(cvc.plan),
@@ -451,7 +451,7 @@ class SqlGraphRegistrationContext(
         .identifier
 
       graphRegistrationContext.registerFlow(
-        UnresolvedFlow(
+        UntypedFlow(
           identifier = flowIdentifier,
           destinationIdentifier = qualifiedDestinationIdentifier,
           func = 
FlowAnalysis.createFlowFunctionFromLogicalPlan(flowQueryLogicalPlan),
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
new file mode 100644
index 000000000000..8d365906559b
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import java.util.Locale
+
+import scala.util.Success
+
+import org.apache.spark.sql.{functions => F, AnalysisException, Column, 
QueryTest}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.pipelines.graph.{
+  AutoCdcFlow,
+  AutoCdcMergeFlow,
+  FlowFunction,
+  FlowFunctionResult,
+  Input,
+  QueryContext,
+  QueryOrigin
+}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{DataType, IntegerType, LongType, 
StringType, StructField, StructType}
+
+/**
+ * Unit tests for the [[AutoCdcFlow]] and [[AutoCdcMergeFlow]] that do not 
execute graph analysis
+ * or execution.
+ */
+class AutoCdcFlowSuite extends QueryTest with SharedSparkSession {
+
+  private val testIdentifier = TableIdentifier("cdc_target", Some("db"))
+
+  /** A no-op [[FlowFunction]] that throws if invoked; AutoCdcFlow tests 
should never call it. */
+  private val noOpFlowFunction: FlowFunction = new FlowFunction {
+    override def call(
+        allInputs: Set[TableIdentifier],
+        availableInputs: Seq[Input],
+        configuration: Map[String, String],
+        queryContext: QueryContext,
+        queryOrigin: QueryOrigin): FlowFunctionResult =
+      throw new UnsupportedOperationException(
+        "noOpFlowFunction.call should not be invoked from AutoCdcFlowSuite 
tests"
+      )
+  }
+
+  private val testQueryContext =
+    QueryContext(currentCatalog = Some("test_catalog"), currentDatabase = 
Some("test_db"))
+
+  private val testChangeArgs = ChangeArgs(
+    keys = Seq(UnqualifiedColumnName("id")),
+    sequencing = F.col("seq"),
+    storedAsScdType = ScdType.Type1
+  )
+
+  private def newAutoCdcFlow(
+      identifier: TableIdentifier = testIdentifier,
+      destinationIdentifier: TableIdentifier = testIdentifier,
+      func: FlowFunction = noOpFlowFunction,
+      queryContext: QueryContext = testQueryContext,
+      sqlConf: Map[String, String] = Map.empty,
+      comment: Option[String] = None,
+      origin: QueryOrigin = QueryOrigin.empty,
+      changeArgs: ChangeArgs = testChangeArgs): AutoCdcFlow = {
+    AutoCdcFlow(
+      identifier = identifier,
+      destinationIdentifier = destinationIdentifier,
+      func = func,
+      queryContext = queryContext,
+      sqlConf = sqlConf,
+      comment = comment,
+      origin = origin,
+      changeArgs = changeArgs
+    )
+  }
+
+  test("AutoCdcFlow exposes its constructor fields") {
+    val flow = newAutoCdcFlow(
+      sqlConf = Map("spark.sql.shuffle.partitions" -> "8"),
+      comment = Some("my CDC flow")
+    )
+
+    assert(flow.identifier == testIdentifier)
+    assert(flow.destinationIdentifier == testIdentifier)
+    assert(flow.func eq noOpFlowFunction)
+    assert(flow.queryContext == testQueryContext)
+    assert(flow.sqlConf == Map("spark.sql.shuffle.partitions" -> "8"))
+    assert(flow.comment.contains("my CDC flow"))
+    assert(flow.origin == QueryOrigin.empty)
+    assert(flow.changeArgs == testChangeArgs)
+  }
+
+  test("AutoCdcFlow defaults sqlConf to empty and comment to None") {
+    // Confirms the case-class default values match the documented contract; 
downstream
+    // registration code relies on `sqlConf` being a non-null empty map by 
default so that
+    // `defaultSqlConf ++ flowDef.sqlConf` is well-defined in 
[[GraphRegistrationContext]].
+    val flow = AutoCdcFlow(
+      identifier = testIdentifier,
+      destinationIdentifier = testIdentifier,
+      func = noOpFlowFunction,
+      queryContext = testQueryContext,
+      origin = QueryOrigin.empty,
+      changeArgs = testChangeArgs
+    )
+
+    assert(flow.sqlConf.isEmpty)
+    assert(flow.comment.isEmpty)
+  }
+
+  test("AutoCdcFlow.once is always false") {
+    // AutoCDC flows are streaming-only and must run on every batch trigger, 
never as a
+    // one-shot full-refresh-style flow. Locking this in so a future refactor 
doesn't
+    // accidentally make `once` configurable.
+
+    // In the future we may intentionally add [[once]] support for AutoCDC 
flows, at which point
+    // this test can safely be removed.
+    val flow = newAutoCdcFlow()
+    assert(!flow.once)
+  }
+
+  test("AutoCdcFlow.withSqlConf returns a new instance with the updated 
sqlConf") {
+    val original = newAutoCdcFlow(sqlConf = Map("a" -> "1"))
+    val updated = original.withSqlConf(Map("b" -> "2"))
+
+    assert(updated.sqlConf == Map("b" -> "2"))
+    // All other fields should be preserved verbatim.
+    assert(updated.identifier == original.identifier)
+    assert(updated.destinationIdentifier == original.destinationIdentifier)
+    assert(updated.func eq original.func)
+    assert(updated.queryContext == original.queryContext)
+    assert(updated.comment == original.comment)
+    assert(updated.origin == original.origin)
+    assert(updated.changeArgs == original.changeArgs)
+    // The original must not be mutated.
+    assert(original.sqlConf == Map("a" -> "1"))
+  }
+
+  // 
===========================================================================================
+  // AutoCdcMergeFlow.schema tests
+  // 
===========================================================================================
+
+  /** Materializes a successful [[FlowFunctionResult]] backed by the given 
source dataframe. */
+  private def successfulFuncResult(sourceDf: DataFrame): FlowFunctionResult =
+    FlowFunctionResult(
+      requestedInputs = Set.empty,
+      batchInputs = Set.empty,
+      streamingInputs = Set.empty,
+      usedExternalInputs = Set.empty,
+      dataFrame = Success(sourceDf),
+      sqlConf = Map.empty
+    )
+
+  /** Builds a [[AutoCdcMergeFlow]] over the given source dataframe + change 
args. */
+  private def newAutoCdcMergeFlow(
+      sourceDf: DataFrame,
+      keys: Seq[UnqualifiedColumnName] = Seq(UnqualifiedColumnName("id")),
+      sequencing: Column = F.col("seq"),
+      storedAsScdType: ScdType = ScdType.Type1,
+      columnSelection: Option[ColumnSelection] = None): AutoCdcMergeFlow = {
+    val flow = newAutoCdcFlow(
+      changeArgs = ChangeArgs(
+        keys = keys,
+        sequencing = sequencing,
+        storedAsScdType = storedAsScdType,
+        columnSelection = columnSelection
+      )
+    )
+    new AutoCdcMergeFlow(flow, successfulFuncResult(sourceDf))
+  }
+
+  /** A stable 3-column source CDF schema used across most schema tests. */
+  private def threeColumnSourceDf(): DataFrame = {
+    val schema = new StructType()
+      .add("id", IntegerType, nullable = false)
+      .add("name", StringType)
+      .add("seq", LongType)
+    
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row], 
schema)
+  }
+
+  /** Convenience to extract the [[StructType]] of the projected 
`_cdc_metadata` column. */
+  private def cdcMetadataStruct(schema: StructType): StructType =
+    
schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType]
+
+  test(
+    "AutoCdcMergeFlow.schema appends _cdc_metadata to the source schema when 
no " +
+    "columnSelection is set"
+  ) {
+    val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
+
+    val expected = new StructType()
+      .add("id", IntegerType, nullable = false)
+      .add("name", StringType)
+      .add("seq", LongType)
+      .add(
+        StructField(
+          Scd1BatchProcessor.cdcMetadataColName,
+          Scd1BatchProcessor.cdcMetadataColSchema(LongType),
+          nullable = false
+        )
+      )
+    assert(resolvedFlow.schema == expected)
+  }
+
+  test("AutoCdcMergeFlow.schema applies an IncludeColumns selection") {
+    val resolvedFlow = newAutoCdcMergeFlow(
+      sourceDf = threeColumnSourceDf(),
+      columnSelection = Some(
+        ColumnSelection.IncludeColumns(
+          Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("seq"))
+        )
+      )
+    )
+
+    val expected = new StructType()
+      .add("id", IntegerType, nullable = false)
+      .add("seq", LongType)
+      .add(
+        StructField(
+          Scd1BatchProcessor.cdcMetadataColName,
+          Scd1BatchProcessor.cdcMetadataColSchema(LongType),
+          nullable = false
+        )
+      )
+    assert(resolvedFlow.schema == expected)
+  }
+
+  test("AutoCdcMergeFlow.schema applies an ExcludeColumns selection") {
+    val resolvedFlow = newAutoCdcMergeFlow(
+      sourceDf = threeColumnSourceDf(),
+      columnSelection = Some(
+        ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name")))
+      )
+    )
+
+    val expected = new StructType()
+      .add("id", IntegerType, nullable = false)
+      .add("seq", LongType)
+      .add(
+        StructField(
+          Scd1BatchProcessor.cdcMetadataColName,
+          Scd1BatchProcessor.cdcMetadataColSchema(LongType),
+          nullable = false
+        )
+      )
+    assert(resolvedFlow.schema == expected)
+  }
+
+  test(
+    "AutoCdcMergeFlow.schema's _cdc_metadata struct uses the resolved 
sequencing data type"
+  ) {
+    // Source has a Long `seq` column; sequencing is `cast(seq as int)`, so 
the projected
+    // `_cdc_metadata` fields should be Int (not Long), demonstrating that the 
sequencing
+    // expression's *resolved* type drives the metadata schema.
+    val resolvedFlow = newAutoCdcMergeFlow(
+      sourceDf = threeColumnSourceDf(),
+      sequencing = F.col("seq").cast(IntegerType)
+    )
+
+    val metaStruct = cdcMetadataStruct(resolvedFlow.schema)
+    assert(metaStruct == Scd1BatchProcessor.cdcMetadataColSchema(IntegerType))
+  }
+
+  test("AutoCdcMergeFlow.schema's _cdc_metadata field is non-null with 
nullable inner fields") {
+    val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
+
+    val metaField = resolvedFlow.schema(Scd1BatchProcessor.cdcMetadataColName)
+    assert(!metaField.nullable, "_cdc_metadata column itself must be non-null")
+
+    val metaStruct = metaField.dataType.asInstanceOf[StructType]
+    assert(metaStruct(Scd1BatchProcessor.cdcDeleteSequenceFieldName).nullable)
+    assert(metaStruct(Scd1BatchProcessor.cdcUpsertSequenceFieldName).nullable)
+  }
+
+  test("AutoCdcMergeFlow.schema is stable across reads") {
+    // The schema computation calls `df.select(sequencing).schema`, which 
triggers Spark
+    // analysis. The eagerly-initialized `val` caches the result so downstream 
consumers get
+    // a stable schema instance across reads.
+    val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
+    val first = resolvedFlow.schema
+    val second = resolvedFlow.schema
+    assert(first eq second, "schema should be cached as a val and return the 
same instance")
+  }
+
+  test("AutoCdcMergeFlow rejects SCD2 at construction with 
AUTOCDC_SCD2_NOT_SUPPORTED") {
+    // Constructing the flow forces the resolved schema, which is unsupported 
for SCD2 today.
+    // Failing eagerly (rather than deferring to the first downstream `schema` 
read) is the
+    // intended UX -- pipeline graph analysis should not be able to register 
an SCD2 AutoCDC
+    // flow at all.
+    checkError(
+      exception = intercept[AnalysisException] {
+        newAutoCdcMergeFlow(
+          sourceDf = threeColumnSourceDf(),
+          storedAsScdType = ScdType.Type2
+        )
+      },
+      condition = "AUTOCDC_SCD2_NOT_SUPPORTED",
+      sqlState = "0A000",
+      parameters = Map.empty
+    )
+  }
+
+  // 
===========================================================================================
+  // AutoCdcMergeFlow.load() contract tests
+  // 
===========================================================================================
+
+  test("AutoCdcMergeFlow.load() schema matches AutoCdcMergeFlow.schema") {
+    val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
+    val loadedDf = resolvedFlow.load(readOptions = null)
+    assert(loadedDf.schema == resolvedFlow.schema)
+  }
+
+  test("AutoCdcMergeFlow.load() respects an IncludeColumns selection") {
+    val resolvedFlow = newAutoCdcMergeFlow(
+      sourceDf = threeColumnSourceDf(),
+      columnSelection = Some(
+        ColumnSelection.IncludeColumns(
+          Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("seq"))
+        )
+      )
+    )
+    val loadedDf = resolvedFlow.load(readOptions = null)
+    assert(loadedDf.schema == resolvedFlow.schema)
+    // The user-selected portion drops `name`; the trailing column is the SCD1 
metadata.
+    assert(
+      loadedDf.schema.fieldNames.toSeq ==
+      Seq("id", "seq", Scd1BatchProcessor.cdcMetadataColName)
+    )
+  }
+
+  test("AutoCdcMergeFlow.load() respects an ExcludeColumns selection") {
+    val resolvedFlow = newAutoCdcMergeFlow(
+      sourceDf = threeColumnSourceDf(),
+      columnSelection = Some(
+        ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name")))
+      )
+    )
+    val loadedDf = resolvedFlow.load(readOptions = null)
+    assert(loadedDf.schema == resolvedFlow.schema)
+    assert(
+      loadedDf.schema.fieldNames.toSeq ==
+      Seq("id", "seq", Scd1BatchProcessor.cdcMetadataColName)
+    )
+  }
+
+  test("AutoCdcMergeFlow.load() materializes the CDC metadata column as 
null-filled") {
+    // The merge engine fills in the metadata at execution time; at planning 
time we synthesize
+    // a typed null placeholder so that load().schema matches schema. This 
test pins down the
+    // placeholder shape: outer struct non-null, inner fields null-filled.
+    val schema = new StructType()
+      .add("id", IntegerType, nullable = false)
+      .add("name", StringType)
+      .add("seq", LongType)
+    val sourceRows = java.util.Arrays.asList(
+      org.apache.spark.sql.Row(1, "a", 100L),
+      org.apache.spark.sql.Row(2, "b", 200L)
+    )
+    val sourceDf = spark.createDataFrame(sourceRows, schema)
+    val resolvedFlow = newAutoCdcMergeFlow(sourceDf)
+
+    val loadedDf = resolvedFlow.load(readOptions = null)
+    val collected = loadedDf.collect()
+    assert(collected.length == 2)
+
+    val metaIdx = 
loadedDf.schema.fieldIndex(Scd1BatchProcessor.cdcMetadataColName)
+    collected.foreach { row =>
+      assert(!row.isNullAt(metaIdx), "_cdc_metadata struct itself should be 
non-null")
+      val metaRow = row.getStruct(metaIdx)
+      assert(metaRow.isNullAt(0), "deleteSequence placeholder should be null")
+      assert(metaRow.isNullAt(1), "upsertSequence placeholder should be null")
+    }
+  }
+
+  // 
===========================================================================================
+  // AutoCdcMergeFlow reserved-prefix validation tests
+  //
+  // The two "contract:" tests below lock in the high-level invariant that no 
reserved-prefix
+  // column name can be referenced anywhere -- not in the source change-data 
feed schema, and
+  // not in user-supplied [[ChangeArgs]] (keys or columnSelection). Together 
they ensure that
+  // (a) users cannot opt out of the reserved CDC metadata column by omitting 
it from the
+  // selected schema, and (b) users cannot opt in to (or out of) any other 
reserved-prefix
+  // name we may reserve in the future for an internal CDC concern.
+  //
+  // The remaining tests pin down case-sensitivity nuances of the 
source-schema validator.
+  // 
===========================================================================================
+
+  /** Builds an empty source df with `id` + `seq` + the supplied extra 
columns. */
+  private def sourceDfWithExtraColumns(extraColumns: (String, DataType)*): 
DataFrame = {
+    val schema = extraColumns.foldLeft(
+      new StructType().add("id", IntegerType, nullable = false).add("seq", 
LongType)
+    ) { case (acc, (name, dt)) => acc.add(name, dt) }
+    
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row], 
schema)
+  }
+
+  test(
+    "Contract: a source df column with the reserved AutoCDC prefix is rejected 
at flow " +
+    "construction"
+  ) {
+    val conflictingName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo"
+    val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        newAutoCdcMergeFlow(sourceDf)
+      },
+      condition = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT",
+      sqlState = "42710",
+      parameters = Map(
+        "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+        "columnName" -> conflictingName,
+        "schemaName" -> "changeDataFeed",
+        "reservedColumnNamePrefix" -> 
Scd1BatchProcessor.reservedColumnNamePrefix
+      )
+    )
+  }
+
+  test(
+    "Contract: ChangeArgs referencing a reserved-prefix column is rejected 
even when the " +
+    "source df is clean"
+  ) {
+    // The source df has no reserved-prefix columns, but referencing a 
reserved-prefix column
+    // from any ChangeArgs path still fails at construction with a different 
error. The
+    // reservation is on the name itself, not on its presence in the source 
feed.
+    val cleanSourceDf = threeColumnSourceDf()
+    val reservedName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo"
+
+    val keysEx = intercept[AnalysisException] {
+      newAutoCdcMergeFlow(
+        sourceDf = cleanSourceDf,
+        keys = Seq(UnqualifiedColumnName(reservedName))
+      )
+    }
+    assert(keysEx.getCondition == "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA")
+
+    val includeEx = intercept[AnalysisException] {
+      newAutoCdcMergeFlow(
+        sourceDf = cleanSourceDf,
+        columnSelection = Some(
+          ColumnSelection.IncludeColumns(
+            Seq(UnqualifiedColumnName("id"), 
UnqualifiedColumnName(reservedName))
+          )
+        )
+      )
+    }
+    assert(includeEx.getCondition == "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA")
+
+    val excludeEx = intercept[AnalysisException] {
+      newAutoCdcMergeFlow(
+        sourceDf = cleanSourceDf,
+        columnSelection = Some(
+          
ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName(reservedName)))
+        )
+      )
+    }
+    assert(excludeEx.getCondition == "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA")
+  }
+
+  test(
+    "AutoCdcMergeFlow rejects a source df column whose name equals the 
reserved CDC " +
+    "metadata column"
+  ) {
+    // Locks in the previous engine-level guard 
(Scd1BatchProcessor.extendMicrobatchRowsWith
+    // CdcMetadata) at flow-construction time. Any future regression where a 
user-supplied
+    // CDC stream carries the reserved metadata column name should fail 
eagerly here.
+    val sourceDf = 
sourceDfWithExtraColumns(Scd1BatchProcessor.cdcMetadataColName -> StringType)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        newAutoCdcMergeFlow(sourceDf)
+      },
+      condition = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT",
+      sqlState = "42710",
+      parameters = Map(
+        "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+        "columnName" -> Scd1BatchProcessor.cdcMetadataColName,
+        "schemaName" -> "changeDataFeed",
+        "reservedColumnNamePrefix" -> 
Scd1BatchProcessor.reservedColumnNamePrefix
+      )
+    )
+  }
+
+  test(
+    "AutoCdcMergeFlow rejects an uppercase reserved-prefix column when 
caseSensitive=false"
+  ) {
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      val conflictingName =
+        
s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo".toUpperCase(Locale.ROOT)
+      val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType)
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          newAutoCdcMergeFlow(sourceDf)
+        },
+        condition = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT",
+        sqlState = "42710",
+        parameters = Map(
+          "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+          "columnName" -> conflictingName,
+          "schemaName" -> "changeDataFeed",
+          "reservedColumnNamePrefix" -> 
Scd1BatchProcessor.reservedColumnNamePrefix
+        )
+      )
+    }
+  }
+
+  test(
+    "AutoCdcMergeFlow allows an uppercase reserved-prefix column when 
caseSensitive=true"
+  ) {
+    // Under case-sensitive analysis, the uppercase variant is a distinct 
identifier and does
+    // not collide with the lowercase reserved namespace. Locks in that the 
validation respects
+    // `spark.sql.caseSensitive`, consistent with the schema-augmentation 
logic in this class.
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+      val nonConflictingName =
+        
s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo".toUpperCase(Locale.ROOT)
+      val sourceDf = sourceDfWithExtraColumns(nonConflictingName -> StringType)
+
+      // No exception expected: construction succeeds.
+      newAutoCdcMergeFlow(sourceDf)
+    }
+  }
+
+  // 
===========================================================================================
+  // AutoCdcMergeFlow keys-presence validation tests 
(requireKeysPresentInSelectedSchema)
+  // 
===========================================================================================
+
+  test("AutoCdcMergeFlow rejects a key that is not present in the source 
change-data feed") {
+    // No columnSelection: the post-selection schema equals the source schema. 
The key `id`
+    // is absent from the source df entirely, so the validator must surface a 
CDC-specific
+    // error rather than deferring to Spark's generic UNRESOLVED_COLUMN.
+    val schema = new StructType()
+      .add("name", StringType)
+      .add("seq", LongType)
+    val sourceDf =
+      
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row], 
schema)
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        newAutoCdcMergeFlow(sourceDf)
+      },
+      condition = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA",
+      sqlState = "22023",
+      parameters = Map(
+        "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+        "keyColumnName" -> "id"
+      )
+    )
+  }
+
+  test("AutoCdcMergeFlow rejects a key dropped by an IncludeColumns 
selection") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        newAutoCdcMergeFlow(
+          sourceDf = threeColumnSourceDf(),
+          columnSelection = Some(
+            ColumnSelection.IncludeColumns(
+              Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("seq"))
+            )
+          )
+        )
+      },
+      condition = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA",
+      sqlState = "22023",
+      parameters = Map(
+        "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+        "keyColumnName" -> "id"
+      )
+    )
+  }
+
+  test("AutoCdcMergeFlow rejects a key dropped by an ExcludeColumns 
selection") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        newAutoCdcMergeFlow(
+          sourceDf = threeColumnSourceDf(),
+          columnSelection = Some(
+            ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id")))
+          )
+        )
+      },
+      condition = "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA",
+      sqlState = "22023",
+      parameters = Map(
+        "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+        "keyColumnName" -> "id"
+      )
+    )
+  }
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
index c78dc123621b..9432150c4016 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.pipelines.autocdc
 
-import java.util.Locale
-
 import org.apache.spark.sql.{functions => F, AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.classic.DataFrame
 import org.apache.spark.sql.internal.SQLConf
@@ -671,81 +669,6 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
     assert(ex.getCondition == "DATATYPE_MISMATCH.CAST_WITHOUT_SUGGESTION")
   }
 
-  test("extendMicrobatchRowsWithCdcMetadata rejects a microbatch that already 
contains the " +
-    "reserved CDC metadata column") {
-    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
-      val schema = new StructType()
-        .add("id", IntegerType)
-        .add("seq", LongType)
-        .add(Scd1BatchProcessor.cdcMetadataColName, StringType)
-
-      val batch = microbatchOf(schema)(
-        Row(1, 10L, "user-supplied")
-      )
-
-      val processor = Scd1BatchProcessor(
-        changeArgs = ChangeArgs(
-          keys = Seq(UnqualifiedColumnName("id")),
-          sequencing = F.col("seq"),
-          storedAsScdType = ScdType.Type1
-        ),
-        resolvedSequencingType = LongType
-      )
-
-      checkError(
-        exception = intercept[AnalysisException] {
-          processor.extendMicrobatchRowsWithCdcMetadata(batch)
-        },
-        condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
-        sqlState = "42710",
-        parameters = Map(
-          "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive,
-          "columnName" -> Scd1BatchProcessor.cdcMetadataColName,
-          "schemaName" -> "microbatch",
-          "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
-        )
-      )
-    }
-  }
-
-  test("extendMicrobatchRowsWithCdcMetadata rejects reserved CDC metadata 
column " +
-    "case-insensitively") {
-    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
-      val conflictingColumnName = 
Scd1BatchProcessor.cdcMetadataColName.toUpperCase(Locale.ROOT)
-      val schema = new StructType()
-        .add("id", IntegerType)
-        .add("seq", LongType)
-        .add(conflictingColumnName, StringType)
-
-      val batch = microbatchOf(schema)(
-        Row(1, 10L, "user-supplied")
-      )
-
-      val processor = Scd1BatchProcessor(
-        changeArgs = ChangeArgs(
-          keys = Seq(UnqualifiedColumnName("id")),
-          sequencing = F.col("seq"),
-          storedAsScdType = ScdType.Type1
-        ),
-        resolvedSequencingType = LongType
-      )
-
-      checkError(
-        exception = intercept[AnalysisException] {
-          processor.extendMicrobatchRowsWithCdcMetadata(batch)
-        },
-        condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
-        sqlState = "42710",
-        parameters = Map(
-          "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
-          "columnName" -> conflictingColumnName,
-          "schemaName" -> "microbatch",
-          "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
-        )
-      )
-    }
-  }
-
   test("projectTargetColumnsOntoMicrobatch keeps every user column and the CDC 
metadata column " +
     "when columnSelection is None") {
     val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
index f37716b4a24d..f19fed4e5780 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.pipelines.graph
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.pipelines.autocdc.{ChangeArgs, ScdType, 
UnqualifiedColumnName}
 import org.apache.spark.sql.pipelines.utils.{PipelineTest, 
TestGraphRegistrationContext}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, StructType}
@@ -547,4 +548,220 @@ class ConnectInvalidPipelineSuite extends PipelineTest 
with SharedSparkSession {
     assert(!ex1.getMessage.contains(streamingTableHint))
     assert(ex2.getMessage.contains(streamingTableHint))
   }
+
+  test(
+    "AutoCDC flow targeting a materialized view fails with " +
+    "STREAMING_RELATION_FOR_MATERIALIZED_VIEW"
+  ) {
+    val session = spark
+    import session.implicits._
+
+    val graph = new TestGraphRegistrationContext(spark) {
+      val cdcEvents = MemoryStream[Int].toDF().select($"value" as "id", 
$"value" as "seq")
+      registerTable(
+        Table(
+          identifier = fullyQualifiedIdentifier("target"),
+          comment = None,
+          specifiedSchema = None,
+          partitionCols = None,
+          clusterCols = None,
+          properties = Map.empty,
+          origin = QueryOrigin.empty,
+          format = Some("parquet"),
+          normalizedPath = None,
+          isStreamingTable = false
+        )
+      )
+      registerFlow(
+        AutoCdcFlow(
+          identifier = fullyQualifiedIdentifier("auto_cdc_flow"),
+          destinationIdentifier = fullyQualifiedIdentifier("target"),
+          func = dfFlowFunc(cdcEvents),
+          queryContext = QueryContext(
+            currentCatalog = 
Some(TestGraphRegistrationContext.DEFAULT_CATALOG),
+            currentDatabase = 
Some(TestGraphRegistrationContext.DEFAULT_DATABASE)
+          ),
+          origin = QueryOrigin.empty,
+          changeArgs = ChangeArgs(
+            keys = Seq(UnqualifiedColumnName("id")),
+            sequencing = $"seq",
+            storedAsScdType = ScdType.Type1
+          )
+        )
+      )
+    }.resolveToDataflowGraph()
+
+    val ex = intercept[AnalysisException] {
+      graph.validate()
+    }
+
+    checkError(
+      exception = ex,
+      condition = 
"INVALID_FLOW_QUERY_TYPE.STREAMING_RELATION_FOR_MATERIALIZED_VIEW",
+      parameters = Map(
+        "flowIdentifier" -> 
fullyQualifiedIdentifier("auto_cdc_flow").quotedString,
+        "tableIdentifier" -> fullyQualifiedIdentifier("target").quotedString
+      )
+    )
+  }
+
+  test(
+    "AutoCDC flow targeting a persisted view fails with 
STREAMING_RELATION_FOR_PERSISTED_VIEW"
+  ) {
+    val session = spark
+    import session.implicits._
+
+    val graph = new TestGraphRegistrationContext(spark) {
+      val cdcEvents = MemoryStream[Int].toDF().select($"value" as "id", 
$"value" as "seq")
+      registerView(
+        PersistedView(
+          identifier = fullyQualifiedIdentifier("target_view"),
+          properties = Map.empty,
+          sqlText = None,
+          comment = None,
+          origin = QueryOrigin.empty
+        )
+      )
+      registerFlow(
+        AutoCdcFlow(
+          identifier = fullyQualifiedIdentifier("target_view"),
+          destinationIdentifier = fullyQualifiedIdentifier("target_view"),
+          func = dfFlowFunc(cdcEvents),
+          queryContext = QueryContext(
+            currentCatalog = 
Some(TestGraphRegistrationContext.DEFAULT_CATALOG),
+            currentDatabase = 
Some(TestGraphRegistrationContext.DEFAULT_DATABASE)
+          ),
+          origin = QueryOrigin.empty,
+          changeArgs = ChangeArgs(
+            keys = Seq(UnqualifiedColumnName("id")),
+            sequencing = $"seq",
+            storedAsScdType = ScdType.Type1
+          )
+        )
+      )
+    }.resolveToDataflowGraph()
+
+    val ex = intercept[AnalysisException] {
+      graph.validate()
+    }
+
+    checkError(
+      exception = ex,
+      condition = 
"INVALID_FLOW_QUERY_TYPE.STREAMING_RELATION_FOR_PERSISTED_VIEW",
+      parameters = Map(
+        "flowIdentifier" -> 
fullyQualifiedIdentifier("target_view").quotedString,
+        "viewIdentifier" -> 
fullyQualifiedIdentifier("target_view").quotedString
+      )
+    )
+  }
+
+  test(
+    "AutoCDC flow targeting a temporary view fails with 
AUTOCDC_RELATION_FOR_TEMPORARY_VIEW"
+  ) {
+    // Temporary views in SDP normally accept either streaming or batch 
producing flows, but
+    // AutoCDC flows are an explicit exception: SCD reconciliation only runs 
at the
+    // streaming-table sink (`Scd1ForeachBatchExec`), so pointing an AutoCDC 
flow at a view
+    // would silently drop reconciliation and expose just the projected CDF to 
consumers.
+    // `validateFlowStreamingness` rejects this case with a dedicated 
sub-condition under
+    // INVALID_FLOW_QUERY_TYPE.
+    val session = spark
+    import session.implicits._
+
+    val graph = new TestGraphRegistrationContext(spark) {
+      val cdcEvents = MemoryStream[Int].toDF().select($"value" as "id", 
$"value" as "seq")
+      // A pipeline must contain at least one non-temporary dataset; register 
an unrelated
+      // streaming table so the pipeline is non-empty and we can exercise the 
AutoCDC path.
+      registerTable(
+        "dummy_table",
+        query = Some(dfFlowFunc(MemoryStream[Int].toDF()))
+      )
+      registerView(
+        TemporaryView(
+          identifier = fullyQualifiedIdentifier("target_view"),
+          properties = Map.empty,
+          sqlText = None,
+          comment = None,
+          origin = QueryOrigin.empty
+        )
+      )
+      registerFlow(
+        AutoCdcFlow(
+          identifier = fullyQualifiedIdentifier("target_view"),
+          destinationIdentifier = fullyQualifiedIdentifier("target_view"),
+          func = dfFlowFunc(cdcEvents),
+          queryContext = QueryContext(
+            currentCatalog = 
Some(TestGraphRegistrationContext.DEFAULT_CATALOG),
+            currentDatabase = 
Some(TestGraphRegistrationContext.DEFAULT_DATABASE)
+          ),
+          origin = QueryOrigin.empty,
+          changeArgs = ChangeArgs(
+            keys = Seq(UnqualifiedColumnName("id")),
+            sequencing = $"seq",
+            storedAsScdType = ScdType.Type1
+          )
+        )
+      )
+    }.resolveToDataflowGraph()
+
+    val ex = intercept[AnalysisException] {
+      graph.validate()
+    }
+
+    checkError(
+      exception = ex,
+      condition = 
"INVALID_FLOW_QUERY_TYPE.AUTOCDC_RELATION_FOR_TEMPORARY_VIEW",
+      parameters = Map(
+        "flowIdentifier" -> 
fullyQualifiedIdentifier("target_view").quotedString,
+        "viewIdentifier" -> 
fullyQualifiedIdentifier("target_view").quotedString
+      )
+    )
+  }
+
+  test("A multiquery table cannot have an AutoCDC query input") {
+    val session = spark
+    import session.implicits._
+
+    val graph = new TestGraphRegistrationContext(spark) {
+      val cdcEvents = MemoryStream[Int].toDF().select($"value" as "id", 
$"value" as "seq")
+      registerTable("target")
+      registerFlow(
+        AutoCdcFlow(
+          identifier = fullyQualifiedIdentifier("auto_cdc_flow"),
+          destinationIdentifier = fullyQualifiedIdentifier("target"),
+          func = dfFlowFunc(cdcEvents),
+          queryContext = QueryContext(
+            currentCatalog = 
Some(TestGraphRegistrationContext.DEFAULT_CATALOG),
+            currentDatabase = 
Some(TestGraphRegistrationContext.DEFAULT_DATABASE)
+          ),
+          origin = QueryOrigin.empty,
+          changeArgs = ChangeArgs(
+            keys = Seq(UnqualifiedColumnName("id")),
+            sequencing = $"seq",
+            storedAsScdType = ScdType.Type1
+          )
+        )
+      )
+      registerFlow(
+        destinationName = "target",
+        name = "extra_flow",
+        query = dfFlowFunc(MemoryStream[Int].toDF().select($"value" as "id", 
$"value" as "seq"))
+      )
+    }.resolveToDataflowGraph()
+
+    val ex = intercept[AnalysisException] {
+      graph.validate()
+    }
+
+    checkError(
+      exception = ex,
+      condition = "AUTOCDC_MULTIPLE_FLOWS_TO_TARGET",
+      parameters = Map(
+        "tableName" -> fullyQualifiedIdentifier("target").unquotedString,
+        "flows" -> Seq(
+          fullyQualifiedIdentifier("auto_cdc_flow").unquotedString,
+          fullyQualifiedIdentifier("extra_flow").unquotedString
+        ).sorted.mkString(", ")
+      )
+    )
+  }
 }
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
index 3ac3c0901750..3c7db2cca889 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectValidPipelineSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.Union
 import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.pipelines.autocdc.{ChangeArgs, ScdType, 
UnqualifiedColumnName}
 import org.apache.spark.sql.pipelines.utils.{PipelineTest, 
TestGraphRegistrationContext}
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
@@ -509,6 +510,38 @@ class ConnectValidPipelineSuite extends PipelineTest with 
SharedSparkSession {
     assert(g.flow(TableIdentifier("sink_flow")).isInstanceOf[StreamingFlow])
   }
 
+  test("AutoCdcFlow registers and resolves to AutoCdcMergeFlow") {
+    val session = spark
+    import session.implicits._
+
+    val P = new TestGraphRegistrationContext(spark) {
+      val mem = MemoryStream[Int]
+      val cdcEvents = mem.toDF().select($"value" as "id", $"value" as "seq")
+      registerTable("target")
+      registerFlow(
+        AutoCdcFlow(
+          identifier = fullyQualifiedIdentifier("auto_cdc_flow"),
+          destinationIdentifier = fullyQualifiedIdentifier("target"),
+          func = dfFlowFunc(cdcEvents),
+          queryContext = QueryContext(
+            currentCatalog = 
Some(TestGraphRegistrationContext.DEFAULT_CATALOG),
+            currentDatabase = 
Some(TestGraphRegistrationContext.DEFAULT_DATABASE)
+          ),
+          origin = QueryOrigin.empty,
+          changeArgs = ChangeArgs(
+            keys = Seq(UnqualifiedColumnName("id")),
+            sequencing = $"seq",
+            storedAsScdType = ScdType.Type1
+          )
+        )
+      )
+    }
+    val g = P.resolveToDataflowGraph()
+    assert(
+      
g.flow(fullyQualifiedIdentifier("auto_cdc_flow")).isInstanceOf[AutoCdcMergeFlow]
+    )
+  }
+
   /** Verifies the [[DataflowGraph]] has the specified [[Flow]] with the 
specified schema. */
   private def verifyFlowSchema(
       pipeline: DataflowGraph,
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala
index 9ff92ee895b1..f5bdf87a6cc6 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/utils/TestGraphRegistrationContext.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{LocalTempView, PersistedView => 
PersistedViewType, UnresolvedRelation, ViewType}
 import org.apache.spark.sql.classic.{DataFrame, SparkSession}
-import org.apache.spark.sql.pipelines.graph.{DataflowGraph, FlowAnalysis, 
FlowFunction, GraphIdentifierManager, GraphRegistrationContext, PersistedView, 
QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, Table, 
TemporaryView, UnresolvedFlow}
+import org.apache.spark.sql.pipelines.graph.{DataflowGraph, FlowAnalysis, 
FlowFunction, GraphIdentifierManager, GraphRegistrationContext, PersistedView, 
QueryContext, QueryOrigin, QueryOriginType, Sink, SinkImpl, Table, 
TemporaryView, UntypedFlow}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -176,7 +176,7 @@ class TestGraphRegistrationContext(
 
     if (query.isDefined) {
       registerFlow(
-        new UnresolvedFlow(
+        UntypedFlow(
           identifier = qualifiedIdentifier,
           destinationIdentifier = qualifiedIdentifier,
           func = query.get,
@@ -267,7 +267,7 @@ class TestGraphRegistrationContext(
     )
 
     registerFlow(
-      new UnresolvedFlow(
+      UntypedFlow(
         identifier = viewIdentifier,
         destinationIdentifier = viewIdentifier,
         func = query,
@@ -339,7 +339,7 @@ class TestGraphRegistrationContext(
       }
 
     registerFlow(
-      new UnresolvedFlow(
+      UntypedFlow(
         identifier = flowIdentifier,
         destinationIdentifier = flowDestinationIdentifier,
         func = query,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to