This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 7ed7fe52f43f [SPARK-56957][SDP] AutoCDC Flow Execution; Introduce and 
Integrate SCD1 `Scd1MergeStreamingWrite`
7ed7fe52f43f is described below

commit 7ed7fe52f43f4faa05238bb85a3799d07ddfd05f
Author: AnishMahto <[email protected]>
AuthorDate: Wed May 27 22:50:38 2026 +0800

    [SPARK-56957][SDP] AutoCDC Flow Execution; Introduce and Integrate SCD1 
`Scd1MergeStreamingWrite`
    
    ### What changes were proposed in this pull request?
    In order for a pipeline to actually execute an AutoCDC SCD1 flow, the SDP 
engine needs to have a "physical" flow definition that defines what streaming 
transformation must be done for SCD1, and how to construct this physical flow 
given a "logical" flow.
    
    The `FlowPlanner` is responsible for converting a resolved SCD1 streaming 
logical flow into the SCD1 streaming physical flow. The physical flow 
implements the SCD1 `foreachBatch` streaming query on the flow input.
    
    Integration of physical flow unblocks pipeline execution with AutoCDC 
flows, which means we need to also fill gaps for auxiliary/target table 
management, schema evolution, inter-pipeline validation, etc.
    
    One validation these changes intentionally do not include is validating 
against a changing key-set across pipeline invocations - that requires more 
design and will be handled in a separate PR.
    
    ### Why are the changes needed?
    To actually execute AutoCDC SCD1 flow transformation with an SDP pipeline. 
Before this point if an AutoCDC flow was  registered with the graph, the graph 
analysis engine would throw an unrecognized flow exception.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    With these changes, we can now actually test how an AutoCDC flow interacts 
with the rest of SDP. This unlocks a number of features/integrations that we 
should test, such as:
    - Schema evolution
    - Writing to different pipeline dataset types
    - Full refresh semantics
    - Multiflow, multipipeline, for the same AutoCDC target
    - Executing AutoCDC flows over multiple independent pipeline runs
    - etc.
    
    As such I added a new `AutoCdcGraphExecutionTestMixin` providing a 
v2-row-level-ops-capable
    catalog (`SharedTablesInMemoryRowLevelOperationTableCatalog`) and the 
standard
    fixtures all AutoCDC E2E suites share, plus six new end-to-end suites
    (~30 tests) covering:
    
    - `AutoCdcScd1SinglePipelineSuite` — basic upsert/delete reconciliation in 
a single pipeline, plus the `AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE` failure 
path.
    - `AutoCdcScd1MultiPipelineSuite` — independent targets across pipelines, 
downstream readers of an AutoCDC target.
    - `AutoCdcScd1FullRefreshSuite` — full refresh wipes target rows + aux 
table; sequence comparisons reset; selective refresh isolates state.
    - `AutoCdcScd1SchemaEvolutionSuite` — broadening/narrowing column 
selection, nullable column addition, type widening/narrowing, nested 
struct/array evolution, case-only collisions, etc.
    - `AutoCdcScd1AuxiliaryTableDurabilitySuite` — keys-first invariant, 
declared key order preserved, multi-run sequence comparisons, transparent 
aux-table recreation if dropped.
    - `AutoCdcScd1TargetTableDurabilitySuite` — pre-loaded target rows, 
late-added CDC metadata column.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Co-authored.
    
    Generated-by: Claude-Opus-4.7-thinking-xhigh
    
    Closes #56122 from 
AnishMahto/SPARK-56957-implement-SCD1-execution-streaming-flow.
    
    Lead-authored-by: AnishMahto <[email protected]>
    Co-authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 7632d37a50ef725fe1d622ca50975d5ec71d545e)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |   6 +
 python/pyspark/pipelines/api.py                    |   5 +
 .../pipelines/autocdc/AutoCdcReservedNames.scala   |  32 +
 .../spark/sql/pipelines/autocdc/ChangeArgs.scala   |  16 +-
 .../sql/pipelines/autocdc/Scd1BatchProcessor.scala |  47 +-
 .../spark/sql/pipelines/graph/DatasetManager.scala |  14 +
 .../apache/spark/sql/pipelines/graph/Flow.scala    |   5 +-
 .../spark/sql/pipelines/graph/FlowExecution.scala  | 208 ++++++
 .../spark/sql/pipelines/graph/FlowPlanner.scala    |  37 +-
 .../sql/pipelines/autocdc/AutoCdcFlowSuite.scala   |  14 +-
 .../graph/AutoCdcGraphExecutionTestMixin.scala     | 213 ++++++
 .../AutoCdcScd1AuxiliaryTableDurabilitySuite.scala | 241 +++++++
 .../graph/AutoCdcScd1FullRefreshSuite.scala        | 245 +++++++
 .../graph/AutoCdcScd1MultiPipelineSuite.scala      | 296 +++++++++
 .../graph/AutoCdcScd1SchemaEvolutionSuite.scala    | 733 +++++++++++++++++++++
 .../graph/AutoCdcScd1SinglePipelineSuite.scala     | 216 ++++++
 .../AutoCdcScd1TargetTableDurabilitySuite.scala    | 159 +++++
 .../graph/ConnectInvalidPipelineSuite.scala        |   2 +-
 18 files changed, 2455 insertions(+), 34 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index b792afc47b57..1c03cea7fcd3 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -256,6 +256,12 @@
     ],
     "sqlState" : "0A000"
   },
+  "AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE" : {
+    "message" : [
+      "Cannot start AutoCDC flow: the target table <tableName> (format: 
<format>) does not support row-level operations. AutoCDC requires a target 
backed by a connector that supports MERGE."
+    ],
+    "sqlState" : "0A000"
+  },
   "AVRO_CANNOT_WRITE_NULL_FIELD" : {
     "message" : [
       "Cannot write null value for field <name> defined as non-null Avro data 
type <dataType>.",
diff --git a/python/pyspark/pipelines/api.py b/python/pyspark/pipelines/api.py
index 578b28ec3793..084547f4c2b1 100644
--- a/python/pyspark/pipelines/api.py
+++ b/python/pyspark/pipelines/api.py
@@ -556,6 +556,11 @@ def create_auto_cdc_flow(
     Note that for keys, sequence_by, column_list, and except_column_list the 
arguments have to
     be column identifiers without qualifiers, e.g. they cannot be 
col("sourceTable.keyId").
 
+    The set and types of `keys` are part of the Auto CDC flow's persisted 
state. Changing keys
+    across incremental runs (renaming, swapping, growing, shrinking, or 
changing the type of a
+    key column) is not supported and will produce undefined behavior. To 
change the key set,
+    fully refresh the target table.
+
     :param target: The name of the target table that receives the Auto CDC 
flow.
     :param source: The name of the CDC source to stream from.
     :param keys: The column or combination of columns that uniquely identify a 
row in the source \
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala
new file mode 100644
index 000000000000..2b0f8e293e76
--- /dev/null
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+/**
+ * Names that AutoCDC reserves for its own use, both for internal columns it 
inserts during
+ * reconciliation (e.g. `${prefix}metadata`, `${prefix}winning_row`) and for 
internal tables it
+ * manages alongside user-defined targets (e.g. the per-target auxiliary state 
table).
+ *
+ * A single recognizable prefix gives a single auditable answer to "what does 
AutoCDC own", and
+ * lets user-defined columns and tables be unambiguously distinguished from 
AutoCDC-managed ones.
+ */
+private[pipelines] object AutoCdcReservedNames {
+
+  /** Common reserved-name prefix shared by AutoCDC internal columns and 
internal tables. */
+  val prefix: String = "__spark_autocdc_"
+}
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
index b975e06807f5..c475377ba506 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
@@ -129,13 +129,23 @@ private[pipelines] object CaseSensitivityLabels {
 }
 
 /** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */
-sealed trait ScdType
+sealed trait ScdType {
+  /**
+   * Short, stable label for this SCD type. Persisted as table property on 
AutoCDC flow auxiliary
+   * tables.
+   */
+  def label: String
+}
 
 object ScdType {
   /** Representation for the standard SCD1 strategy. */
-  case object Type1 extends ScdType
+  case object Type1 extends ScdType {
+    override val label: String = "SCD1"
+  }
   /** Representation for the standard SCD2 strategy. */
-  case object Type2 extends ScdType
+  case object Type2 extends ScdType {
+    override val label: String = "SCD2"
+  }
 }
 
 /**
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index aaea3b63e9ef..0035f442fb00 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -367,19 +367,29 @@ case class Scd1BatchProcessor(
     val incomingWinsDelete = microbatchDeleteVersionField.isNotNull &&
       microbatchDeleteVersionField > destinationUpsertVersionField
 
-    // When the incoming upsert wins against an existing record, the entire 
row (all columns)
-    // will be overwritten, including the CDC metadata column. We only exclude 
keys because
-    // most merge implementations require that join columns are not being 
mutated, even if
-    // the mutation is a no-op.
     val resolver = microbatchDf.sparkSession.sessionState.conf.resolver
     val keyNames = changeArgs.keys.map(_.name)
+
+    def constructTargetColumnAssignmentsFromMicrobatch(columnName: String): 
(String, Column) = {
+      // Map a column in the target table to its direct equivalent in the 
microbatch. Note that
+      // because of target-table schema evolution during SDP dataset 
materialization, the
+      // microbatch's columns are always a subset of (or equal to) the 
target's columns.
+      val quotedCol = QuotingUtils.quoteIdentifier(columnName)
+      s"$destinationTableStr.$quotedCol" -> F.col(s"microbatch.$quotedCol")
+    }
+
+    // Most merge implementations require that join columns are not mutated, 
even when the
+    // mutation would be a no-op. The remaining microbatch columns (including 
the CDC metadata
+    // column) are overwritten outright when the incoming upsert wins.
     val columnsToUpdateWhenIncomingWinsUpsert: Map[String, Column] =
       microbatchDf.columns
         .filterNot(c => keyNames.exists(resolver(_, c)))
-        .map { c =>
-          val quotedCol = QuotingUtils.quoteIdentifier(c)
-          s"$destinationTableStr.$quotedCol" -> F.col(s"microbatch.$quotedCol")
-        }
+        .map(constructTargetColumnAssignmentsFromMicrobatch)
+        .toMap
+
+    val columnsToInsertOnNewKey: Map[String, Column] =
+      microbatchDf.columns
+        .map(constructTargetColumnAssignmentsFromMicrobatch)
         .toMap
 
     microbatchDf
@@ -391,7 +401,12 @@ case class Scd1BatchProcessor(
       // New key: only insert upserts; deletes for absent keys are no-ops for 
the target table
       // merge, and instead would have been inserted as tombstones into the 
auxiliary table.
       .whenNotMatched(microbatchDeleteVersionField.isNull)
-      .insertAll()
+      // When inserting a brand new row for a new key, construct column 
mappings from microbatch.
+      // The microbatch's columns may be a strict subset of the target's 
columns -- e.g. the user
+      // narrowed `column_list` between runs, or the source DF dropped a 
column. The target's
+      // columns can never be a strict subset of the microbatch's, however, 
because SDP's schema
+      // evolution always unions old and new schemas onto the target.
+      .insert(columnsToInsertOnNewKey)
       .merge()
   }
 
@@ -417,17 +432,15 @@ case class Scd1BatchProcessor(
 
 object Scd1BatchProcessor {
   /**
-   * Reserved column-name prefix for internal SDP AutoCDC processing. Source 
change-data-feed
-   * dataframes must not contain any columns starting with this prefix; the 
invariant is
+   * Internal columns inserted by AutoCDC reconciliation. Source 
change-data-feed dataframes must
+   * not contain any columns starting with [[AutoCdcReservedNames.prefix]]; 
the invariant is
    * enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]] 
construction.
    */
-  private[pipelines] val reservedColumnNamePrefix: String = "__spark_autocdc_"
-
-  private[autocdc] val winningRowColName: String = 
s"${reservedColumnNamePrefix}winning_row"
-  private[pipelines] val cdcMetadataColName: String = 
s"${reservedColumnNamePrefix}metadata"
+  private[autocdc] val winningRowColName: String = 
s"${AutoCdcReservedNames.prefix}winning_row"
+  private[pipelines] val cdcMetadataColName: String = 
s"${AutoCdcReservedNames.prefix}metadata"
 
-  private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence"
-  private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence"
+  private[pipelines] val cdcDeleteSequenceFieldName: String = "deleteSequence"
+  private[pipelines] val cdcUpsertSequenceFieldName: String = "upsertSequence"
 
   /** Project the delete sequence out of the CDC metadata column. */
   private[autocdc] def deleteSequenceOf(cdcMetadataCol: Column): Column =
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
index 4affbe4637db..456edca8d1e2 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
@@ -303,6 +303,20 @@ object DatasetManager extends Logging {
       context.spark.sql(s"TRUNCATE TABLE ${table.identifier.quotedString}")
     }
 
+    if (isFullRefresh) {
+      // On full refresh, drop the AutoCDC auxiliary state associated with 
this table (if any) so
+      // that stale delete-tracking data and table properties are not carried 
forward into the new
+      // table generation. We unconditionally issue the DROP for every 
fully-refreshed target.
+
+      // Intentionally DROP and not TRUNCATE: the auxiliary table is an 
internal state store
+      // that is not part of the dataflow graph, so it does not participate in 
regular schema
+      // evolution like user tables do. On a full refresh we want a clean 
recreation against
+      // the new target schema rather than carrying forward the previous 
generation's layout.
+
+      val auxiliaryTableId = AutoCdcAuxiliaryTable.identifier(table.identifier)
+      context.spark.sql(s"DROP TABLE IF EXISTS 
${auxiliaryTableId.quotedString}")
+    }
+
     // Alter the table if we need to
     existingTableOpt.foreach { existingTable =>
       val existingSchema = v2ColumnsToStructType(existingTable.columns())
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
index 04ef8d3186c5..9f357ef026b0 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.{AliasIdentifier, 
TableIdentifier}
 import org.apache.spark.sql.classic.DataFrame
 import org.apache.spark.sql.pipelines.AnalysisWarning
 import org.apache.spark.sql.pipelines.autocdc.{
+  AutoCdcReservedNames,
   CaseSensitivityLabels,
   ChangeArgs,
   ColumnSelection,
@@ -271,7 +272,7 @@ class AutoCdcMergeFlow(
   }
 
   /** The DataType of the sequencing expression, derived once from the source 
change feed. */
-  private val sequencingType: DataType =
+  private[graph] val sequencingType: DataType =
     df.select(changeArgs.sequencing).schema.head.dataType
 
   /**
@@ -335,7 +336,7 @@ class AutoCdcMergeFlow(
    */
   private def requireReservedPrefixAbsentInSourceColumns(): Unit = {
     val resolver = spark.sessionState.conf.resolver
-    val reservedPrefix = Scd1BatchProcessor.reservedColumnNamePrefix
+    val reservedPrefix = AutoCdcReservedNames.prefix
 
     def nameContainsReservedPrefix(name: String): Boolean = {
       name.length >= reservedPrefix.length && resolver(
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
index 13a5621947d5..ea151830f544 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
@@ -23,12 +23,24 @@ import java.util.concurrent.atomic.AtomicBoolean
 import scala.concurrent.{ExecutionContext, Future}
 import scala.util.control.NonFatal
 
+import org.apache.spark.SparkException
 import org.apache.spark.internal.{Logging, LogKeys}
+import org.apache.spark.sql.{AnalysisException, Dataset, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.ClassicConversions._
 import org.apache.spark.sql.classic.SparkSession
+import org.apache.spark.sql.connector.catalog.{Identifier, 
SupportsRowLevelOperations, TableCatalog}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.pipelines.autocdc.{
+  AutoCdcReservedNames,
+  ChangeArgs,
+  Scd1BatchProcessor,
+  Scd1ForeachBatchHandler
+}
 import org.apache.spark.sql.pipelines.graph.QueryOrigin.ExceptionHelpers
 import org.apache.spark.sql.pipelines.util.SparkSessionUtils
 import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
+import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -301,3 +313,199 @@ class SinkWrite(
       .start()
   }
 }
+
+object AutoCdcAuxiliaryTable {
+  /**
+   * Helper for deriving the auxiliary AutoCDC catalog table identifier from a 
target table. If a
+   * table exists with a name matching the name derived here, it is assumed to 
be an AutoCDC
+   * auxiliary table that should be managed by the pipeline.
+   */
+  def identifier(destination: TableIdentifier): TableIdentifier = 
TableIdentifier(
+    table = s"${AutoCdcReservedNames.prefix}aux_state_${destination.table}",
+    database = destination.database,
+    catalog = destination.catalog
+  )
+
+  /**
+   * Reserved table property key set on the auxiliary table to record which 
SCD strategy it
+   * serves.
+   */
+  val scdTypePropertyKey: String = 
s"${PipelinesTableProperties.pipelinesPrefix}autocdc.scd_type"
+}
+
+/**
+ * Base trait for AutoCDC merge-based write flows.
+ */
+trait AutoCdcMergeWriteBase {
+  /** The spark session the AutoCDC flow is going to be planned in. */
+  protected def spark: SparkSession
+
+  /** The destination (target) table entity the AutoCDC flow will be writing 
to. */
+  protected def destination: Table
+
+  /** The AutoCDC flow's [[ChangeArgs]] (keys, sequencing, columnSelection, 
...). */
+  protected def changeArgs: ChangeArgs
+
+  /** Full schema of the auxiliary table for this SCD type. */
+  protected def auxiliaryTableSchema: StructType
+
+  /**
+   * Idempotently create the auxiliary table for [[destination]] if it does 
not already exist
+   * and return its [[TableIdentifier]].
+   *
+   * Note that this is `CREATE TABLE IF NOT EXISTS`: when the aux table 
already exists, its
+   * schema is left untouched and `auxiliaryTableSchema` is ignored. For SCD1, 
they keys must be
+   * invariant across executions and the CDC metadata will always be present, 
so this is correct.
+   */
+  protected def createAuxiliaryTableIfNotExists(spark: SparkSession): 
TableIdentifier = {
+    val auxIdent = AutoCdcAuxiliaryTable.identifier(destination.identifier)
+    // The auxiliary table inherits the target's format so MERGE semantics 
line up. When the
+    // target's format is unspecified (None), omit the USING clause and fall 
back to the
+    // session's default source provider.
+    val usingClause = destination.format.map(fmt => s"USING 
$fmt").getOrElse("")
+    val tblPropertiesClause =
+      s"TBLPROPERTIES ('${AutoCdcAuxiliaryTable.scdTypePropertyKey}' = " +
+        s"'${changeArgs.storedAsScdType.label}')"
+    spark.sql(
+      s"""CREATE TABLE IF NOT EXISTS
+         |${auxIdent.quotedString}
+         |(${auxiliaryTableSchema.toDDL}) $usingClause 
$tblPropertiesClause""".stripMargin
+    )
+    auxIdent
+  }
+
+  /**
+   * Validate that the target table's underlying connector implements
+   * [[SupportsRowLevelOperations]], which is the V2 connector contract for 
MERGE/UPDATE/DELETE
+   * with rewrite - all operations that the AutoCDC transformation executes.
+   */
+  protected def requireDestinationSupportsRowLevelOps(): Unit = {
+    val (catalog, v2Identifier) = resolveTableCatalog(spark, 
destination.identifier)
+    val destinationTable = catalog.loadTable(v2Identifier)
+
+    if (!destinationTable.isInstanceOf[SupportsRowLevelOperations]) {
+      throw new AnalysisException(
+        errorClass = "AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE",
+        messageParameters = Map(
+          "tableName" -> destination.identifier.quotedString,
+          "format" -> destination.format.orElse(
+              Option(
+                destinationTable.properties.get(TableCatalog.PROP_PROVIDER)
+              )
+            )
+            .getOrElse("<unknown>")
+        )
+      )
+    }
+  }
+
+  private def resolveTableCatalog(
+      spark: SparkSession,
+      ident: TableIdentifier): (TableCatalog, Identifier) = {
+    val catalogManager = spark.sessionState.catalogManager
+    val catalogPlugin = ident.catalog
+      .map(catalogManager.catalog)
+      .getOrElse(catalogManager.currentCatalog)
+    val catalog = catalogPlugin match {
+      case t: TableCatalog => t
+      case _ => throw 
QueryCompilationErrors.missingCatalogTablesAbilityError(catalogPlugin)
+    }
+    val namespace = ident.database.getOrElse(
+      throw SparkException.internalError(
+        s"Cannot resolve table identifier ${ident.quotedString}: namespace is 
unspecified."
+      )
+    )
+    (catalog, Identifier.of(Array(namespace), ident.table))
+  }
+}
+
+/**
+ * A [[StreamingFlowExecution]] that applies a CDC event stream to a target 
[[Table]] via
+ * SCD Type 1 MERGE semantics.
+ */
+class Scd1MergeStreamingWrite(
+    val identifier: TableIdentifier,
+    val flow: AutoCdcMergeFlow,
+    val graph: DataflowGraph,
+    val updateContext: PipelineUpdateContext,
+    val checkpointPath: String,
+    val trigger: Trigger,
+    val destination: Table,
+    val sqlConf: Map[String, String]
+) extends StreamingFlowExecution with AutoCdcMergeWriteBase {
+
+  requireDestinationSupportsRowLevelOps()
+
+  override def getOrigin: QueryOrigin = flow.origin
+
+  override protected def changeArgs: ChangeArgs = flow.changeArgs
+
+  override def startStream(): StreamingQuery = {
+    val sourceChangeDataFeed = graph.reanalyzeFlow(flow).df
+
+    // The auxiliary table is created here (at flow execution) rather than 
during flow resolution
+    // or dataset materialization for two reasons:
+    //   1. It is an internal state store: we deliberately keep it out of the 
graph registration
+    //      context's table set so that it is invisible to other flows and the 
[[DatasetManager]]
+    //      will never materialize it.
+    //   2. Its format must match the target table's, which only exists after 
the target is
+    //      materialized. Flow resolution must also stay side-effect free 
(e.g. for dry runs).
+    val auxiliaryTableIdentifier = createAuxiliaryTableIfNotExists(spark = 
updateContext.spark)
+
+    val foreachBatchHandler = Scd1ForeachBatchHandler(
+      batchProcessor = Scd1BatchProcessor(
+        changeArgs = flow.changeArgs,
+        resolvedSequencingType = flow.sequencingType
+      ),
+      auxiliaryTableIdentifier = auxiliaryTableIdentifier,
+      targetTableIdentifier = destination.identifier
+    )
+
+    sourceChangeDataFeed.writeStream
+      .queryName(displayName)
+      .option("checkpointLocation", checkpointPath)
+      .trigger(trigger)
+      .foreachBatch((batch: Dataset[Row], batchId: Long) => {
+        foreachBatchHandler.execute(batch, batchId)
+      })
+      .start()
+  }
+
+  override protected lazy val auxiliaryTableSchema: StructType =
+    // SCD1's auxiliary table is just keys + the CDC metadata struct; no user 
data columns. Keys
+    // come first, in `changeArgs.keys` declaration order, to anchor the 
per-key sequence
+    // watermark used to gate out-of-order events.
+    StructType(autoCdcKeyFields :+ cdcMetadataField)
+
+  /**
+   * AutoCDC key columns resolved out of the flow's augmented schema, in
+   * `changeArgs.keys` declaration order. Keys are guaranteed to be present in 
the schema
+   * because [[AutoCdcMergeFlow.schema]] validates that.
+   */
+  private lazy val autoCdcKeyFields: Seq[StructField] = {
+    val resolver = updateContext.spark.sessionState.conf.resolver
+    val targetTableSchema = flow.schema
+    flow.changeArgs.keys.map { key =>
+      targetTableSchema.fields
+        .find(field => resolver(field.name, key.name))
+        .getOrElse(
+          throw SparkException.internalError(
+            s"Key column '${key.name}' was not found in the AutoCDC flow's 
selected schema."
+          )
+        )
+    }
+  }
+
+  /** CDC metadata field resolved out of the flow's augmented schema. */
+  private lazy val cdcMetadataField: StructField = {
+    val resolver = updateContext.spark.sessionState.conf.resolver
+    flow.schema.fields
+      .find(field => resolver(field.name, 
Scd1BatchProcessor.cdcMetadataColName))
+      .getOrElse(
+        throw SparkException.internalError(
+          s"CDC metadata column '${Scd1BatchProcessor.cdcMetadataColName}' was 
not found in the " +
+          s"AutoCDC flow's target table schema."
+        )
+      )
+  }
+}
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowPlanner.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowPlanner.scala
index 29e2da4a5e13..8251780524a2 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowPlanner.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowPlanner.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.pipelines.graph
 
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.pipelines.autocdc.ScdType
 import org.apache.spark.sql.streaming.Trigger
 
 /**
@@ -73,10 +75,30 @@ class FlowPlanner(
               trigger = triggerFor(sf),
               checkpointPath = flowMetadata.latestCheckpointLocation
             )
-          case _ =>
-            throw new UnsupportedOperationException(
-              s"Unsupported destination type: ${output.getClass.getSimpleName} 
for " +
-              s"streaming flow ${sf.identifier} 
(${flow.destinationIdentifier})"
+          case _ => unsupportedDestinationType(sf, output)
+        }
+      case acmf: AutoCdcMergeFlow =>
+        acmf.changeArgs.storedAsScdType match {
+          case ScdType.Type1 =>
+            val flowMetadata = FlowSystemMetadata(updateContext, acmf, graph)
+            output match {
+              case o: Table =>
+                new Scd1MergeStreamingWrite(
+                  identifier = acmf.identifier,
+                  flow = acmf,
+                  graph = graph,
+                  updateContext = updateContext,
+                  checkpointPath = flowMetadata.latestCheckpointLocation,
+                  trigger = triggerFor(acmf),
+                  destination = o,
+                  sqlConf = acmf.sqlConf
+                )
+              case _ => unsupportedDestinationType(acmf, output)
+            }
+          case ScdType.Type2 =>
+            throw new AnalysisException(
+              errorClass = "AUTOCDC_SCD2_NOT_SUPPORTED",
+              messageParameters = Map.empty
             )
         }
       case _ =>
@@ -85,4 +107,11 @@ class FlowPlanner(
         )
     }
   }
+
+  private def unsupportedDestinationType(flow: ResolvedFlow, output: Output): 
Nothing = {
+    throw new UnsupportedOperationException(
+      s"Unsupported destination type: ${output.getClass.getSimpleName} for " +
+      s"flow ${flow.identifier} writing to ${flow.destinationIdentifier}"
+    )
+  }
 }
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
index 8d365906559b..65eafd6c7dcc 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
@@ -409,7 +409,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
     "Contract: a source df column with the reserved AutoCDC prefix is rejected 
at flow " +
     "construction"
   ) {
-    val conflictingName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo"
+    val conflictingName = s"${AutoCdcReservedNames.prefix}foo"
     val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType)
 
     checkError(
@@ -422,7 +422,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
         "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
         "columnName" -> conflictingName,
         "schemaName" -> "changeDataFeed",
-        "reservedColumnNamePrefix" -> 
Scd1BatchProcessor.reservedColumnNamePrefix
+        "reservedColumnNamePrefix" -> AutoCdcReservedNames.prefix
       )
     )
   }
@@ -435,7 +435,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
     // from any ChangeArgs path still fails at construction with a different 
error. The
     // reservation is on the name itself, not on its presence in the source 
feed.
     val cleanSourceDf = threeColumnSourceDf()
-    val reservedName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo"
+    val reservedName = s"${AutoCdcReservedNames.prefix}foo"
 
     val keysEx = intercept[AnalysisException] {
       newAutoCdcMergeFlow(
@@ -487,7 +487,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
         "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
         "columnName" -> Scd1BatchProcessor.cdcMetadataColName,
         "schemaName" -> "changeDataFeed",
-        "reservedColumnNamePrefix" -> 
Scd1BatchProcessor.reservedColumnNamePrefix
+        "reservedColumnNamePrefix" -> AutoCdcReservedNames.prefix
       )
     )
   }
@@ -497,7 +497,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
   ) {
     withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
       val conflictingName =
-        
s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo".toUpperCase(Locale.ROOT)
+        s"${AutoCdcReservedNames.prefix}foo".toUpperCase(Locale.ROOT)
       val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType)
 
       checkError(
@@ -510,7 +510,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
           "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
           "columnName" -> conflictingName,
           "schemaName" -> "changeDataFeed",
-          "reservedColumnNamePrefix" -> 
Scd1BatchProcessor.reservedColumnNamePrefix
+          "reservedColumnNamePrefix" -> AutoCdcReservedNames.prefix
         )
       )
     }
@@ -524,7 +524,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
     // `spark.sql.caseSensitive`, consistent with the schema-augmentation 
logic in this class.
     withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
       val nonConflictingName =
-        
s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo".toUpperCase(Locale.ROOT)
+        s"${AutoCdcReservedNames.prefix}foo".toUpperCase(Locale.ROOT)
       val sourceDf = sourceDfWithExtraColumns(nonConflictingName -> StringType)
 
       // No exception expected: construction succeeds.
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
new file mode 100644
index 000000000000..5e2286a4fd56
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.scalatest.{BeforeAndAfterEach, Suite}
+
+import org.apache.spark.SparkThrowable
+import org.apache.spark.sql.{Column, Row}
+import 
org.apache.spark.sql.connector.catalog.SharedTablesInMemoryRowLevelOperationTableCatalog
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.pipelines.autocdc.{
+  ChangeArgs,
+  ColumnSelection,
+  Scd1BatchProcessor,
+  ScdType,
+  UnqualifiedColumnName
+}
+import org.apache.spark.sql.pipelines.common.RunState
+import org.apache.spark.sql.pipelines.logging.RunProgress
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Shared helpers for AutoCDC end-to-end graph-execution test suites.
+ */
+trait AutoCdcGraphExecutionTestMixin extends BeforeAndAfterEach {
+  self: Suite with ExecutionTest with SharedSparkSession =>
+
+  /** v2 catalog name registered for AutoCDC E2E tests. Tests qualify tables 
as `cat.ns1.t`. */
+  protected val catalog: String = "cat"
+
+  /** Namespace under [[catalog]] used by AutoCDC E2E tests. */
+  protected val namespace: String = "ns1"
+
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    spark.conf.set(
+      s"spark.sql.catalog.$catalog",
+      classOf[SharedTablesInMemoryRowLevelOperationTableCatalog].getName
+    )
+    // Disable per-flow retries so failure-path tests (e.g. INCOMPATIBLE_DATA) 
surface the
+    // AnalysisException after the first attempt instead of going through the 
default 2 retries,
+    // which would otherwise emit duplicate FAILED events and inflate test 
runtime without
+    // changing the asserted outcome.
+    spark.conf.set(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS.key, "0")
+    spark.sql(s"CREATE NAMESPACE IF NOT EXISTS $catalog.$namespace")
+  }
+
+  override protected def afterEach(): Unit = {
+    SharedTablesInMemoryRowLevelOperationTableCatalog.reset()
+    spark.sessionState.catalogManager.reset()
+    spark.sessionState.conf.unsetConf(s"spark.sql.catalog.$catalog")
+    
spark.sessionState.conf.unsetConf(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS.key)
+    super.afterEach()
+  }
+
+  /**
+   * Run a pipeline to completion. If any flow emitted a [[RunProgress]] event 
with state
+   * [[RunState.FAILED]], collect every error from the event buffer and throw 
a single
+   * exception listing them, so that test failures surface meaningful stack 
traces instead of
+   * generic "test exited normally but flow failed" errors.
+   */
+  protected def runPipeline(ctx: TestGraphRegistrationContext): Unit = {
+    val updateCtx = TestPipelineUpdateContext(spark, ctx.toDataflowGraph, 
storageRoot)
+    updateCtx.pipelineExecution.runPipeline()
+    updateCtx.pipelineExecution.awaitCompletion()
+
+    if (updateCtx.eventBuffer.getEvents.exists(_.details == 
RunProgress(RunState.FAILED))) {
+      val errors = updateCtx.eventBuffer.getEvents.flatMap(_.error)
+      val ex = new RuntimeException(
+        s"Pipeline run failed with ${errors.size} error(s):\n" +
+        errors.map { e =>
+          val stackSnippet = e.getStackTrace
+            .map(f => s"    at $f")
+            .mkString("\n")
+          s"  ${e.getClass.getSimpleName}: ${e.getMessage}\n$stackSnippet"
+        }.mkString("\n")
+      )
+      errors.foreach(ex.addSuppressed)
+      throw ex
+    }
+  }
+
+  /**
+   * Walk every [[Throwable]] reachable from `failure` via 
[[Throwable#getSuppressed]] and
+   * [[Throwable#getCause]], searching for the first [[SparkThrowable]] whose
+   * [[SparkThrowable#getCondition]] equals `condition`, then run 
[[checkError]] against that
+   * exception with all of its other arguments propagated through.
+   */
+  protected def checkErrorInPipelineFailure(
+      failure: Throwable,
+      condition: String,
+      sqlState: Option[String] = None,
+      parameters: Map[String, String] = Map.empty,
+      matchPVals: Boolean = false,
+      queryContext: Array[ExpectedContext] = Array.empty): Unit = {
+
+    def causeChain(t: Throwable): Iterator[Throwable] =
+      Iterator.iterate[Throwable](t)(_.getCause).takeWhile(_ != null)
+
+    def reachable: Iterator[Throwable] =
+      (Iterator(failure) ++ failure.getSuppressed.iterator).flatMap(causeChain)
+
+    val matched = reachable.collectFirst {
+      case t: SparkThrowable if t.getCondition == condition => t
+    }
+    assert(
+      matched.isDefined,
+      s"Expected a SparkThrowable with condition '$condition' reachable from 
the runPipeline " +
+      s"failure chain, got top-level: ${failure.getMessage}; chain:\n" +
+      reachable
+        .map(t => s"  ${t.getClass.getSimpleName}: ${t.getMessage}")
+        .mkString("\n")
+    )
+    checkError(
+      exception = matched.get,
+      condition = condition,
+      sqlState = sqlState,
+      parameters = parameters,
+      matchPVals = matchPVals,
+      queryContext = queryContext
+    )
+  }
+
+  /**
+   * DDL fragment for the AutoCDC metadata column appended to every SCD1 
target table. Use
+   * inside a `CREATE TABLE` statement, for example:
+   *   `CREATE TABLE t (id INT NOT NULL, version BIGINT NOT NULL, 
$cdcMetadataDdl)`
+   *
+   * Assumes sequence type is BIGINT (Long).
+   */
+  protected val cdcMetadataDdl: String = {
+    val col = Scd1BatchProcessor.cdcMetadataColName
+    val del = Scd1BatchProcessor.cdcDeleteSequenceFieldName
+    val ups = Scd1BatchProcessor.cdcUpsertSequenceFieldName
+    s"$col STRUCT<$del:BIGINT,$ups:BIGINT> NOT NULL"
+  }
+
+  /**
+   * Insert a pre-existing row into a target table, populating the CDC 
metadata struct so the
+   * row looks as if a previous AutoCDC run upserted it at sequencing version 
[[sequence]].
+   *
+   * @param table     Fully-qualified table name (catalog.schema.table).
+   * @param colValues Comma-separated SQL literals for the user-defined 
columns, in declared
+   *                  order, excluding the trailing CDC metadata column.
+   * @param sequence  Value to seed `_cdc_metadata.upsertSequence` with. The
+   *                  `deleteSequence` field is left NULL.
+   */
+  protected def insertPreloadedRow(table: String, colValues: String, sequence: 
Long): Unit = {
+    val del = Scd1BatchProcessor.cdcDeleteSequenceFieldName
+    val ups = Scd1BatchProcessor.cdcUpsertSequenceFieldName
+    spark.sql(
+      s"INSERT INTO $table SELECT $colValues, " +
+      s"named_struct('$del', CAST(NULL AS BIGINT), '$ups', CAST($sequence AS 
BIGINT))"
+    )
+  }
+
+  /** Catalog identifier of the AutoCDC auxiliary table for 
[[targetTableName]]. */
+  protected def auxTableNameFor(targetTableName: String): String = {
+    val targetIdent = fullyQualifiedIdentifier(targetTableName, Some(catalog), 
Some(namespace))
+    AutoCdcAuxiliaryTable.identifier(targetIdent).unquotedString
+  }
+
+  /**
+   * Construct an [[AutoCdcFlow]] targeting `catalog.namespace.${target}` from 
the given
+   * query and CDC knobs.
+   */
+  protected def autoCdcFlow(
+      name: String,
+      target: String,
+      query: FlowFunction,
+      keys: Seq[String],
+      sequencing: Column,
+      columnSelection: Option[ColumnSelection] = None,
+      deleteCondition: Option[Column] = None,
+      scdType: ScdType = ScdType.Type1
+  ): AutoCdcFlow = AutoCdcFlow(
+    identifier = fullyQualifiedIdentifier(name, Some(catalog), 
Some(namespace)),
+    destinationIdentifier = fullyQualifiedIdentifier(target, Some(catalog), 
Some(namespace)),
+    func = query,
+    queryContext = QueryContext(
+      currentCatalog = Some(catalog),
+      currentDatabase = Some(namespace)
+    ),
+    origin = QueryOrigin.empty,
+    changeArgs = ChangeArgs(
+      keys = keys.map(UnqualifiedColumnName(_)),
+      sequencing = sequencing,
+      columnSelection = columnSelection,
+      deleteCondition = deleteCondition,
+      storedAsScdType = scdType
+    )
+  )
+
+  /** Build a target row's `_cdc_metadata` struct value. */
+  protected def cdcMeta(deleteSeq: Option[Long], upsertSeq: Option[Long]): Row 
=
+    Row(deleteSeq.orNull, upsertSeq.orNull)
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
new file mode 100644
index 000000000000..50ff60556a73
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.functions
+import org.apache.spark.sql.pipelines.autocdc.{
+  ColumnSelection,
+  Scd1BatchProcessor,
+  UnqualifiedColumnName
+}
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests covering the durability of AutoCDC's auxiliary table across pipeline 
runs:
+ * the per-key sequence watermarks recorded in the auxiliary table must 
persist between
+ * incremental runs, and the auxiliary table must be transparently recreated 
if it is
+ * deleted out-of-band.
+ */
+class AutoCdcScd1AuxiliaryTableDurabilitySuite
+    extends ExecutionTest
+    with SharedSparkSession
+    with AutoCdcGraphExecutionTestMixin {
+
+  test("a higher-sequence event in a later pipeline run correctly upserts the 
row") {
+    val session = spark
+    import session.implicits._
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    // Single MemoryStream reused across both pipeline runs so the streaming 
checkpoint can
+    // resume cleanly.
+    val changeDataFeedStream = MemoryStream[(Int, String, Long)]
+    def buildGraphRegistrationContext(): TestGraphRegistrationContext =
+      new TestGraphRegistrationContext(spark) {
+        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+        registerFlow(autoCdcFlow(
+          name = "auto_cdc_flow",
+          target = "target",
+          query = dfFlowFunc(
+            changeDataFeedStream.toDF().toDF("id", "name", "version")
+          ),
+          keys = Seq("id"),
+          sequencing = functions.col("version")
+        ))
+      }
+
+    // Run #1: insert id=1 at seq=1.
+    changeDataFeedStream.addData((1, "alice", 1L))
+    runPipeline(buildGraphRegistrationContext())
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L))))
+    )
+
+    // Run #2: upsert id=1 at seq=2 (must replace) and insert id=2 at seq=1 
(new key).
+    // The auxiliary table from run #1 persists and continues to gate seq 
comparisons.
+    changeDataFeedStream.addData((1, "alice2", 2L), (2, "bob", 1L))
+    runPipeline(buildGraphRegistrationContext())
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(
+        Row(1, "alice2", 2L, cdcMeta(None, Some(2L))),
+        Row(2, "bob", 1L, cdcMeta(None, Some(1L)))
+      )
+    )
+  }
+
+  test("an event with a sequence lower than what was applied in a prior 
pipeline run " +
+    "is suppressed") {
+    val session = spark
+    import session.implicits._
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    // Single MemoryStream reused across both runs so the streaming checkpoint 
can resume.
+    val stream = MemoryStream[(Int, String, Long, Boolean)]
+    def buildCtx(): TestGraphRegistrationContext = new 
TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream.toDF().toDF("id", "name", "version", 
"is_delete")),
+        keys = Seq("id"),
+        sequencing = functions.col("version"),
+        deleteCondition = Some(functions.col("is_delete") === true),
+        columnSelection = Some(ColumnSelection.ExcludeColumns(
+          Seq(UnqualifiedColumnName("is_delete"))
+        ))
+      ))
+    }
+
+    // Run #1: delete id=1 at seq=10. Auxiliary table records seq=10 as the 
watermark.
+    stream.addData((1, "alice", 10L, true))
+    runPipeline(buildCtx())
+    checkAnswer(spark.table(s"$catalog.$namespace.target"), Seq.empty)
+
+    // Run #2: late upsert at seq=5 (< the persisted seq=10 watermark). Must 
be rejected.
+    stream.addData((1, "stale", 5L, false))
+    runPipeline(buildCtx())
+
+    // Auxiliary table watermark from run #1 (seq=10) should keep rejecting 
the seq=5 event.
+    checkAnswer(spark.table(s"$catalog.$namespace.target"), Seq.empty)
+  }
+
+  test("the auxiliary table places the AutoCDC key column first, ahead of any 
non-key " +
+    "source columns") {
+    val session = spark
+    import session.implicits._
+
+    // Source DF column order is (name, id, version): the AutoCDC key column 
`id` does NOT
+    // appear first in the source DF. The auxiliary table must still write 
`id` as its
+    // leading column.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(name STRING, id INT NOT NULL, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    val stream = MemoryStream[(String, Int, Long)]
+    stream.addData(("alice", 1, 1L))
+    val ctx = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream.toDF().toDF("name", "id", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx)
+
+    val auxSchema = spark.table(auxTableNameFor("target")).schema
+
+    // The auxiliary table only contains keys and the metadata column, hence 
"name" should not be
+    // included.
+    assert(auxSchema.fieldNames.toSeq == Seq("id", 
Scd1BatchProcessor.cdcMetadataColName))
+  }
+
+  test("the auxiliary table preserves the user's declared key order, 
independent of the " +
+    "source DataFrame and target table column orders") {
+    val session = spark
+    import session.implicits._
+
+    // Source DF: (value, id, region, version). Target table: (value, id, 
region, version,
+    // _cdc_metadata) -- same ordering as the source. The user, however, 
declares
+    // `keys = Seq("region", "id")` -- the OPPOSITE order from how those 
columns appear in
+    // both the source DF and the target. The auxiliary table should honor the 
user's
+    // declared key order, not either physical column ordering, so subsequent 
runs
+    // positionally compare keys against the same recorded layout.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(value STRING, id INT NOT NULL, region STRING NOT NULL, " +
+      s"version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+
+    val stream = MemoryStream[(String, Int, String, Long)]
+    stream.addData(("v", 1, "us", 1L))
+    val ctx = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream.toDF().toDF("value", "id", "region", 
"version")),
+        keys = Seq("region", "id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx)
+
+    val auxSchema = spark.table(auxTableNameFor("target")).schema
+    assert(auxSchema.fieldNames.toSeq ==
+      Seq("region", "id", Scd1BatchProcessor.cdcMetadataColName))
+  }
+
+  test("if the AutoCDC auxiliary table is dropped between runs, it is 
transparently " +
+    "recreated") {
+    val session = spark
+    import session.implicits._
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+
+    // Single MemoryStream reused across both runs so the streaming checkpoint 
can resume.
+    val stream = MemoryStream[(Int, Long)]
+    def buildCtx(): TestGraphRegistrationContext = new 
TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream.toDF().toDF("id", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+
+    stream.addData((1, 1L))
+    runPipeline(buildCtx())
+    assert(spark.catalog.tableExists(auxTableNameFor("target")))
+
+    // Manually drop the auxiliary table.
+    spark.sql(s"DROP TABLE ${auxTableNameFor("target")}")
+    assert(!spark.catalog.tableExists(auxTableNameFor("target")))
+
+    stream.addData((1, 2L))
+    runPipeline(buildCtx())
+
+    // The dropped auxiliary table must be transparently recreated.
+    assert(spark.catalog.tableExists(auxTableNameFor("target")))
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(Row(1, 2L, cdcMeta(None, Some(2L))))
+    )
+  }
+
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1FullRefreshSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1FullRefreshSuite.scala
new file mode 100644
index 000000000000..94ba7e20aed1
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1FullRefreshSuite.scala
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.functions
+import org.apache.spark.sql.pipelines.autocdc.{
+  ColumnSelection,
+  UnqualifiedColumnName
+}
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests covering AutoCDC's full-refresh semantics: full refresh must wipe 
both the
+ * target rows and the AutoCDC auxiliary table for the refreshed targets, and 
must leave
+ * non-refreshed targets untouched in selective-refresh mode.
+ */
+class AutoCdcScd1FullRefreshSuite
+    extends ExecutionTest
+    with SharedSparkSession
+    with AutoCdcGraphExecutionTestMixin {
+
+  test("full refresh wipes target rows and the auxiliary table for the 
refreshed flow") {
+    val session = spark
+    import session.implicits._
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    // Run #1: populate target + auxiliary table.
+    val stream1 = MemoryStream[(Int, String, Long)]
+    stream1.addData((1, "alice", 5L))
+    val ctx1 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx1)
+    assert(
+      spark.catalog.tableExists(auxTableNameFor("target")),
+      "Auxiliary table should exist after first run"
+    )
+
+    // Run #2 (full refresh): auxiliary table should be dropped by 
DatasetManager, target
+    // truncated. The new run brings only id=2 at seq=1.
+    val stream2 = MemoryStream[(Int, String, Long)]
+    stream2.addData((2, "bob", 1L))
+    val ctx2 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    val updateCtx = TestPipelineUpdateContext(
+      spark,
+      ctx2.toDataflowGraph,
+      storageRoot,
+      fullRefreshTables = AllTables
+    )
+    updateCtx.pipelineExecution.runPipeline()
+    updateCtx.pipelineExecution.awaitCompletion()
+
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(Row(2, "bob", 1L, cdcMeta(None, Some(1L))))
+    )
+  }
+
+  test("after a full refresh, an event with a sequence below the previous 
run's " +
+    "watermark now lands") {
+    val session = spark
+    import session.implicits._
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    // Run #1: delete at seq=10 sets a high watermark in the auxiliary table.
+    val stream1 = MemoryStream[(Int, String, Long, Boolean)]
+    stream1.addData((1, "alice", 10L, true))
+    val ctx1 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version", 
"is_delete")),
+        keys = Seq("id"),
+        sequencing = functions.col("version"),
+        deleteCondition = Some(functions.col("is_delete") === true),
+        columnSelection = Some(ColumnSelection.ExcludeColumns(
+          Seq(UnqualifiedColumnName("is_delete"))
+        ))
+      ))
+    }
+    runPipeline(ctx1)
+
+    // Run #2 (full refresh): auxiliary table is dropped, watermark reset. 
seq=5 should
+    // now land.
+    val stream2 = MemoryStream[(Int, String, Long, Boolean)]
+    stream2.addData((1, "fresh", 5L, false))
+    val ctx2 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version", 
"is_delete")),
+        keys = Seq("id"),
+        sequencing = functions.col("version"),
+        deleteCondition = Some(functions.col("is_delete") === true),
+        columnSelection = Some(ColumnSelection.ExcludeColumns(
+          Seq(UnqualifiedColumnName("is_delete"))
+        ))
+      ))
+    }
+    val updateCtx = TestPipelineUpdateContext(
+      spark,
+      ctx2.toDataflowGraph,
+      storageRoot,
+      fullRefreshTables = AllTables
+    )
+    updateCtx.pipelineExecution.runPipeline()
+    updateCtx.pipelineExecution.awaitCompletion()
+
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(Row(1, "fresh", 5L, cdcMeta(None, Some(5L))))
+    )
+  }
+
+  test("selective full refresh wipes only the requested target's auxiliary 
state") {
+    val session = spark
+    import session.implicits._
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.t_a " +
+      s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.t_b " +
+      s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+
+    // streamA is replaced across runs because t_a is full-refreshed in run #2 
(its streaming
+    // checkpoint is reset by full-refresh, so a fresh source is fine and 
matches the user-visible
+    // semantics). streamB is reused across runs because t_b is NOT 
full-refreshed -- its
+    // streaming checkpoint must resume against the same MemoryStream 
instance, otherwise the
+    // seq=5 assertion below could pass for the wrong reason (the source never 
produced seq=5
+    // in run #2 instead of the aux watermark suppressing it).
+    val streamA1 = MemoryStream[(Int, Long)]
+    val streamB = MemoryStream[(Int, Long)]
+    streamA1.addData((1, 10L))
+    streamB.addData((1, 10L))
+    val ctx1 = new TestGraphRegistrationContext(spark) {
+      registerTable("t_a", catalog = Some(catalog), database = Some(namespace))
+      registerTable("t_b", catalog = Some(catalog), database = Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_a",
+        target = "t_a",
+        query = dfFlowFunc(streamA1.toDF().toDF("id", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+      registerFlow(autoCdcFlow(
+        name = "flow_b",
+        target = "t_b",
+        query = dfFlowFunc(streamB.toDF().toDF("id", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx1)
+
+    // Run #2: full refresh ONLY on t_a; t_b's auxiliary state must persist.
+    val streamA2 = MemoryStream[(Int, Long)]
+    streamA2.addData((1, 5L))   // would have been suppressed pre-refresh; now 
wins
+    streamB.addData((1, 5L))    // must be suppressed (auxiliary table retains 
seq=10)
+    val ctx2 = new TestGraphRegistrationContext(spark) {
+      registerTable("t_a", catalog = Some(catalog), database = Some(namespace))
+      registerTable("t_b", catalog = Some(catalog), database = Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_a",
+        target = "t_a",
+        query = dfFlowFunc(streamA2.toDF().toDF("id", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+      registerFlow(autoCdcFlow(
+        name = "flow_b",
+        target = "t_b",
+        query = dfFlowFunc(streamB.toDF().toDF("id", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    val updateCtx = TestPipelineUpdateContext(
+      spark,
+      ctx2.toDataflowGraph,
+      storageRoot,
+      fullRefreshTables = SomeTables(Set(
+        fullyQualifiedIdentifier("t_a", Some(catalog), Some(namespace))
+      ))
+    )
+    updateCtx.pipelineExecution.runPipeline()
+    updateCtx.pipelineExecution.awaitCompletion()
+
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.t_a"),
+      Seq(Row(1, 5L, cdcMeta(None, Some(5L))))
+    )
+    // t_b: pre-existing seq=10 row still wins; the seq=5 event is dropped.
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.t_b"),
+      Seq(Row(1, 10L, cdcMeta(None, Some(10L))))
+    )
+  }
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
new file mode 100644
index 000000000000..32f34923c480
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.functions
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * End-to-end tests that exercise interactions between separate AutoCDC 
pipelines (i.e.
+ * distinct [[DataflowGraph]] / [[TestPipelineUpdateContext]] invocations) 
sharing the same
+ * v2 catalog. These complement the single-pipeline AutoCDC suites by 
validating the
+ * boundary semantics between independently-deployed pipelines.
+ *
+ * Each test constructs two graphs and runs them sequentially. In real 
deployments these
+ * could be two different pipeline definitions writing into the same 
metastore; the tests
+ * here verify that AutoCDC's per-target catalog state (target table, 
auxiliary table,
+ * schema invariants) behaves correctly across these pipeline boundaries.
+ */
+class AutoCdcScd1MultiPipelineSuite
+    extends ExecutionTest
+    with SharedSparkSession
+    with AutoCdcGraphExecutionTestMixin {
+
+  test("two AutoCDC pipelines targeting separate tables maintain independent 
target and " +
+    "auxiliary tables") {
+    val session = spark
+    import session.implicits._
+
+    // Two distinct target tables created up-front.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.t_a " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.t_b " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    // Pipeline #1 only knows about `t_a`. Its auxiliary table
+    // cat.ns1.__spark_autocdc_aux_state_t_a must not affect pipeline #2's 
`t_b`.
+    val streamA = MemoryStream[(Int, String, Long)]
+    streamA.addData((1, "alice", 100L))
+    val ctxA = new TestGraphRegistrationContext(spark) {
+      registerTable("t_a", catalog = Some(catalog), database = Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_a",
+        target = "t_a",
+        query = dfFlowFunc(streamA.toDF().toDF("id", "name", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctxA)
+
+    // Pipeline #2 only knows about `t_b`. Uses a deliberately *lower* 
sequence to verify
+    // the watermark from pipeline #1's auxiliary table (seq=100) does not 
leak into
+    // pipeline #2.
+    val streamB = MemoryStream[(Int, String, Long)]
+    streamB.addData((9, "bob", 1L))
+    val ctxB = new TestGraphRegistrationContext(spark) {
+      registerTable("t_b", catalog = Some(catalog), database = Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_b",
+        target = "t_b",
+        query = dfFlowFunc(streamB.toDF().toDF("id", "name", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctxB)
+
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.t_a"),
+      Seq(Row(1, "alice", 100L, cdcMeta(None, Some(100L))))
+    )
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.t_b"),
+      Seq(Row(9, "bob", 1L, cdcMeta(None, Some(1L))))
+    )
+
+    // Each target has its own auxiliary table; no cross-contamination.
+    assert(spark.catalog.tableExists(auxTableNameFor("t_a")))
+    assert(spark.catalog.tableExists(auxTableNameFor("t_b")))
+  }
+
+  test("a downstream pipeline can read an AutoCDC target written by a 
different pipeline " +
+    "without observing the CDC metadata column") {
+    val session = spark
+    import session.implicits._
+
+    // Pipeline #1 writes into target `src` via AutoCDC.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.src " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+    val stream = MemoryStream[(Int, String, Long)]
+    stream.addData((1, "alice", 1L), (2, "bob", 1L))
+    val ctxWriter = new TestGraphRegistrationContext(spark) {
+      registerTable("src", catalog = Some(catalog), database = Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "writer",
+        target = "src",
+        query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctxWriter)
+
+    // Pipeline #2 is a regular materialized view that selects the user-data 
columns from
+    // `src` (a different graph entirely). It must observe the merged AutoCDC 
rows and be
+    // able to ignore the metadata column without it polluting downstream 
consumers.
+    val ctxReader = new TestGraphRegistrationContext(spark) {
+      registerMaterializedView(
+        "downstream_mv",
+        query = dfFlowFunc(
+          spark.read.table(s"$catalog.$namespace.src").select("id", "name", 
"version")
+        )
+      )
+    }
+    runPipeline(ctxReader)
+
+    checkAnswer(
+      spark.table(fullyQualifiedIdentifier("downstream_mv").toString),
+      Seq(Row(1, "alice", 1L), Row(2, "bob", 1L))
+    )
+  }
+
+  test("two AutoCDC pipelines targeting the same table with identical key and 
data " +
+    "schemas merge into a shared target table") {
+    val session = spark
+    import session.implicits._
+
+    // Target table is created once up-front; both pipelines target it with 
the same
+    // AutoCDC `keys` and the same source-DF data schema. The two pipelines 
have distinct
+    // flow names ("flow_v1" / "flow_v2") so they own independent streaming 
checkpoints,
+    // but share the target table and its auxiliary table.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.shared_target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    // Pipeline #1: inserts rows with id=1 and id=2 at version=1.
+    val stream1 = MemoryStream[(Int, String, Long)]
+    stream1.addData((1, "alice", 1L), (2, "bob", 1L))
+    val ctx1 = new TestGraphRegistrationContext(spark) {
+      registerTable("shared_target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_v1",
+        target = "shared_target",
+        query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx1)
+
+    // Sanity-check pipeline #1's effect before pipeline #2 runs.
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.shared_target"),
+      Seq(
+        Row(1, "alice", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L))),
+        Row(2, "bob", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)))
+      )
+    )
+
+    // Pipeline #2: updates id=2 (existing key) to a higher sequence and 
inserts id=3
+    // (new key). id=1 is untouched and must survive into the final target 
unchanged.
+    val stream2 = MemoryStream[(Int, String, Long)]
+    stream2.addData((2, "bob-v2", 2L), (3, "carol", 1L))
+    val ctx2 = new TestGraphRegistrationContext(spark) {
+      registerTable("shared_target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_v2",
+        target = "shared_target",
+        query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx2)
+
+    // Final target: id=1 untouched (pipeline #1's state), id=2 updated by 
pipeline #2,
+    // id=3 freshly inserted by pipeline #2.
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.shared_target"),
+      Seq(
+        Row(1, "alice", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L))),
+        Row(2, "bob-v2", 2L, cdcMeta(deleteSeq = None, upsertSeq = Some(2L))),
+        Row(3, "carol", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)))
+      )
+    )
+
+    // The auxiliary table for the shared target is itself shared across both 
pipelines.
+    assert(spark.catalog.tableExists(auxTableNameFor("shared_target")))
+  }
+
+  test("two AutoCDC pipelines targeting the same table with the same key but 
different " +
+    "data columns evolve the shared target schema") {
+    val session = spark
+    import session.implicits._
+
+    // Target is created up-front with pipeline #1's schema only; pipeline #2 
brings a new
+    // top-level nullable `age` column that the dataset materialization layer 
is expected
+    // to schema-merge into the target.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.shared_target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    // Pipeline #1: source DF schema is (id, name, version); inserts id=1 and 
id=2.
+    val stream1 = MemoryStream[(Int, String, Long)]
+    stream1.addData((1, "alice", 1L), (2, "bob", 1L))
+    val ctx1 = new TestGraphRegistrationContext(spark) {
+      registerTable("shared_target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_v1",
+        target = "shared_target",
+        query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx1)
+
+    // Sanity-check pipeline #1's state before schema evolution kicks in.
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.shared_target"),
+      Seq(
+        Row(1, "alice", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L))),
+        Row(2, "bob", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)))
+      )
+    )
+
+    // Pipeline #2: source DF schema is (id, name, age, version). The new 
nullable `age` column
+    // should be added to the target by dataset materialization; pipeline #1's 
untouched id=1 row
+    // is backfilled to NULL.
+    val stream2 = MemoryStream[(Int, String, Option[Int], Long)]
+    stream2.addData((2, "bob-v2", Some(25), 2L), (3, "carol", Some(30), 1L))
+    val ctx2 = new TestGraphRegistrationContext(spark) {
+      registerTable("shared_target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_v2",
+        target = "shared_target",
+        query = dfFlowFunc(stream2.toDF().toDF("id", "name", "age", 
"version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx2)
+
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.shared_target"),
+      Seq(
+        Row(1, "alice", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)), 
null),
+        Row(2, "bob-v2", 2L, cdcMeta(deleteSeq = None, upsertSeq = Some(2L)), 
25),
+        Row(3, "carol", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)), 
30)
+      )
+    )
+
+    // Pipeline #1 runs again with its original (id, name, version) schema. 
The evolved
+    // target schema with `age` must persist: id=1's update leaves age 
untouched, id=4 is
+    // inserted with age=NULL, and pipeline #2's id=2/id=3 rows are unchanged.
+    stream1.addData((1, "alice-v2", 2L), (4, "dave", 1L))
+    runPipeline(ctx1)
+
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.shared_target"),
+      Seq(
+        Row(1, "alice-v2", 2L, cdcMeta(deleteSeq = None, upsertSeq = 
Some(2L)), null),
+        Row(2, "bob-v2", 2L, cdcMeta(deleteSeq = None, upsertSeq = Some(2L)), 
25),
+        Row(3, "carol", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)), 
30),
+        Row(4, "dave", 1L, cdcMeta(deleteSeq = None, upsertSeq = Some(1L)), 
null)
+      )
+    )
+  }
+
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
new file mode 100644
index 000000000000..4c20b21ad57a
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
@@ -0,0 +1,733 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import java.sql.Timestamp
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.functions
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.pipelines.autocdc.{
+  ColumnSelection,
+  UnqualifiedColumnName
+}
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests covering AutoCDC's interaction with schema evolution across pipeline 
runs. The
+ * suite documents the supported additive cases (new top-level columns, new 
nested fields
+ * in array-of-struct, broadening / narrowing column selection) and the cases 
that fail
+ * loudly today (subtractive nested evolution, type-incompatible changes, 
case-only
+ * renames).
+ *
+ * These behaviors are largely inherited from the lower layers 
(`SchemaMergingUtils` for
+ * schema merge, the v2 writer's column-resolution layer for nested-field 
handling) rather
+ * than implemented in AutoCDC itself; the tests here serve as the contract 
for AutoCDC's
+ * observable behavior on top of those layers.
+ */
+class AutoCdcScd1SchemaEvolutionSuite
+    extends ExecutionTest
+    with SharedSparkSession
+    with AutoCdcGraphExecutionTestMixin {
+
+  test("a nullable non-key column merges correctly with mixed NULL and 
non-NULL values") {
+    val session = spark
+    import session.implicits._
+
+    // Single MemoryStream with `email` as nullable from the start. Run #1 
emits a row with
+    // a NULL email; run #2 emits an upsert with a non-NULL email.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, email STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    val stream = MemoryStream[(Int, String, Option[String], Long)]
+    def buildCtx(): TestGraphRegistrationContext = new 
TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream.toDF().toDF("id", "name", "email", 
"version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+
+    // Run #1: insert with NULL email.
+    stream.addData((1, "alice", None, 1L))
+    runPipeline(buildCtx())
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(Row(1, "alice", null, 1L, cdcMeta(None, Some(1L))))
+    )
+
+    // Run #2: upsert with non-NULL email at higher seq replaces the row.
+    stream.addData((1, "alice2", Some("[email protected]"), 2L))
+    runPipeline(buildCtx())
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(Row(1, "alice2", "[email protected]", 2L, cdcMeta(None, Some(2L))))
+    )
+  }
+
+  test("widening a non-key column's type between runs fails with " +
+    "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE") {
+    val session = spark
+    import session.implicits._
+
+    // Changing a non-key column's type between pipeline runs is rejected by
+    // `SchemaMergingUtils` with CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE even when 
the new type
+    // is strictly wider. Users must full-refresh the target to change column 
types.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, age INT, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+
+    val stream1 = MemoryStream[(Int, Int, Long)]
+    stream1.addData((1, 30, 1L))
+    val ctx1 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream1.toDF().toDF("id", "age", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx1)
+
+    // Run #2: widen `age` from Int to Long.
+    val stream2 = MemoryStream[(Int, Long, Long)]
+    stream2.addData((1, 31L, 2L))
+    val ctx2 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream2.toDF().toDF("id", "age", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+    checkErrorInPipelineFailure(
+      failure = ex,
+      condition = "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE",
+      sqlState = Some("42825"),
+      // `left` is the persisted (run #1) INT type; `right` is run #2's 
widened BIGINT.
+      parameters = Map(
+        "left" -> "\"INT\"",
+        "right" -> "\"BIGINT\""
+      )
+    )
+  }
+
+  test("narrowing a non-key column's type between runs fails with " +
+    "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE") {
+    val session = spark
+    import session.implicits._
+
+    // Mirror image of the widening test above: changing a non-key column's 
type between
+    // pipeline runs is rejected by SchemaMergingUtils with 
CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE
+    // even when the new type is strictly narrower.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, payload BIGINT, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    val stream1 = MemoryStream[(Int, Long, Long)]
+    stream1.addData((1, 100L, 1L))
+    val ctx1 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream1.toDF().toDF("id", "payload", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx1)
+
+    // Run #2: narrow `payload` from Long (BIGINT) to Int (INT).
+    val stream2 = MemoryStream[(Int, Int, Long)]
+    stream2.addData((1, 5, 2L))
+    val ctx2 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream2.toDF().toDF("id", "payload", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+
+    val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+    checkErrorInPipelineFailure(
+      failure = ex,
+      condition = "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE",
+      sqlState = Some("42825"),
+      // `left` is the persisted (run #1) BIGINT type; `right` is run #2's 
narrowed INT.
+      parameters = Map(
+        "left" -> "\"BIGINT\"",
+        "right" -> "\"INT\""
+      )
+    )
+  }
+
+  test("a new top-level nullable column appearing in the source DF between 
runs is " +
+    "added to the target") {
+    val session = spark
+    import session.implicits._
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    // Single MemoryStream of (id, name, email, version) shared across runs so 
the streaming
+    // checkpoint can resume cleanly. Run #1's flow drops `email` so the 
source's resolved DF
+    // schema is 3 columns; run #2 keeps all 4. The MemoryStream's underlying 
tuple schema is
+    // unchanged (only the downstream projection differs), so the source 
identity that the
+    // OffsetSeqLog records is stable across runs.
+    val stream = MemoryStream[(Int, String, Option[String], Long)]
+    def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext =
+      new TestGraphRegistrationContext(spark) {
+        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+        val sourceDf = stream.toDF().toDF("id", "name", "email", "version")
+        val projectedDf = if (includeEmail) sourceDf else 
sourceDf.drop("email")
+        registerFlow(autoCdcFlow(
+          name = "auto_cdc_flow",
+          target = "target",
+          query = dfFlowFunc(projectedDf),
+          keys = Seq("id"),
+          sequencing = functions.col("version")
+        ))
+      }
+
+    // Run #1: source projects (id, name, version). Target schema is unchanged.
+    stream.addData((1, "alice", None, 1L))
+    runPipeline(buildCtx(includeEmail = false))
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L))))
+    )
+
+    // Run #2: source projects (id, name, email, version). mergeSchemas 
appends `email` to
+    // the target (StructType.merge keeps the left schema's order and appends 
right-only
+    // fields); existing rows get NULL for the new column.
+    stream.addData((2, "bob", Some("[email protected]"), 2L))
+    runPipeline(buildCtx(includeEmail = true))
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(
+        Row(1, "alice", 1L, cdcMeta(None, Some(1L)), null),
+        Row(2, "bob", 2L, cdcMeta(None, Some(2L)), "[email protected]")
+      )
+    )
+  }
+
+  test("broadening the column selection between runs adds the newly-included 
column to " +
+    "the target") {
+    val session = spark
+    import session.implicits._
+
+    // Source DF schema is fixed at (id, name, email, version) across both 
runs. Only the
+    // `columnSelection` knob differs: run #1 includes (id, name, version); 
run #2 selects
+    // None (= all source columns). mergeSchemas adds `email` to the target 
via the same
+    // generic SDP path as the new-source-column case, but driven by the
+    // [[ColumnSelection]] knob rather than the source DF's own schema.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    val stream = MemoryStream[(Int, String, String, Long)]
+    def buildCtx(selection: Option[ColumnSelection]): 
TestGraphRegistrationContext =
+      new TestGraphRegistrationContext(spark) {
+        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+        registerFlow(autoCdcFlow(
+          name = "auto_cdc_flow",
+          target = "target",
+          query = dfFlowFunc(stream.toDF().toDF("id", "name", "email", 
"version")),
+          keys = Seq("id"),
+          sequencing = functions.col("version"),
+          columnSelection = selection
+        ))
+      }
+
+    // Run #1: only (id, name, version) selected; `email` is dropped before 
the MERGE.
+    stream.addData((1, "alice", "ignored", 1L))
+    runPipeline(buildCtx(selection = Some(ColumnSelection.IncludeColumns(
+      Seq("id", "name", "version").map(UnqualifiedColumnName(_))
+    ))))
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L))))
+    )
+
+    // Run #2: broaden to no selection. mergeSchemas adds `email`; existing 
rows get NULL,
+    // new rows get the actual value.
+    stream.addData((2, "bob", "[email protected]", 2L))
+    runPipeline(buildCtx(selection = None))
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(
+        Row(1, "alice", 1L, cdcMeta(None, Some(1L)), null),
+        Row(2, "bob", 2L, cdcMeta(None, Some(2L)), "[email protected]")
+      )
+    )
+  }
+
+  test("narrowing the column selection between runs preserves the dropped 
column on " +
+    "existing rows and leaves it NULL on new rows") {
+    val session = spark
+    import session.implicits._
+
+    // Validates the additive-only column-selection contract on the narrowing 
side:
+    // tightening `columnSelection` between runs leaves the dropped column in 
place at the
+    // schema level (SDP's `SchemaMergingUtils.mergeSchemas` is a union, never 
a subtraction).
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, email STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    val stream = MemoryStream[(Int, String, String, Long)]
+    def buildCtx(selection: Option[ColumnSelection]): 
TestGraphRegistrationContext =
+      new TestGraphRegistrationContext(spark) {
+        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+        registerFlow(autoCdcFlow(
+          name = "auto_cdc_flow",
+          target = "target",
+          query = dfFlowFunc(stream.toDF().toDF("id", "name", "email", 
"version")),
+          keys = Seq("id"),
+          sequencing = functions.col("version"),
+          columnSelection = selection
+        ))
+      }
+
+    // Run #1: include all columns; populate `email` for key=1.
+    stream.addData((1, "alice", "[email protected]", 1L))
+    runPipeline(buildCtx(selection = None))
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(Row(1, "alice", "[email protected]", 1L, cdcMeta(None, Some(1L))))
+    )
+
+    // Run #2: narrow the selection to drop `email`. The merge omits `email` 
from both
+    // INSERT and UPDATE assignment maps; key=1's `email` is preserved at 
"[email protected]" while
+    // key=2 is inserted with `email = NULL`.
+    stream.addData((2, "bob", "ignored", 2L))
+    runPipeline(buildCtx(selection = Some(ColumnSelection.IncludeColumns(
+      Seq("id", "name", "version").map(UnqualifiedColumnName(_))
+    ))))
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(
+        Row(1, "alice", "[email protected]", 1L, cdcMeta(None, Some(1L))),
+        Row(2, "bob", null, 2L, cdcMeta(None, Some(2L)))
+      )
+    )
+  }
+
+  test("a top-level column dropped from the source DF between runs is 
preserved on " +
+    "existing rows and left NULL on new rows") {
+    val session = spark
+    import session.implicits._
+
+    // Symmetric to the new-source-column case (which exercises the source DF 
*gaining* a
+    // column). Validates that the additive-only column-selection contract 
holds when the
+    // narrowing is driven by the source DF's own schema shrinking, rather 
than by a
+    // tightening [[ChangeArgs.columnSelection]].
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    // Same `MemoryStream[(Int, String, Option[String], Long)]` shape across 
runs; runs
+    // differ in whether `email` is kept in the projected source DF.
+    val stream = MemoryStream[(Int, String, Option[String], Long)]
+    def buildCtx(includeEmail: Boolean): TestGraphRegistrationContext =
+      new TestGraphRegistrationContext(spark) {
+        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+        val sourceDf = stream.toDF().toDF("id", "name", "email", "version")
+        val projectedDf = if (includeEmail) sourceDf else 
sourceDf.drop("email")
+        registerFlow(autoCdcFlow(
+          name = "auto_cdc_flow",
+          target = "target",
+          query = dfFlowFunc(projectedDf),
+          keys = Seq("id"),
+          sequencing = functions.col("version")
+        ))
+      }
+
+    // Run #1: wide source DF (id, name, email, version). mergeSchemas appends 
`email` to
+    // the target.
+    stream.addData((1, "alice", Some("[email protected]"), 1L))
+    runPipeline(buildCtx(includeEmail = true))
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L)), "[email protected]"))
+    )
+
+    // Run #2: source DF drops `email` upstream of the flow. Target still has 
`email`
+    // (`StructType.merge` is additive-only); the merge omits `email` from 
both INSERT and
+    // UPDATE assignment maps. Key=1's `email` is preserved at "[email protected]"; 
key=2 is inserted
+    // with `email = NULL`.
+    stream.addData((2, "bob", None, 2L))
+    runPipeline(buildCtx(includeEmail = false))
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(
+        Row(1, "alice", 1L, cdcMeta(None, Some(1L)), "[email protected]"),
+        Row(2, "bob", 2L, cdcMeta(None, Some(2L)), null)
+      )
+    )
+  }
+
+  test("dropping a nested struct field between runs fails with 
INCOMPATIBLE_DATA_FOR_TABLE") {
+    val session = spark
+    import session.implicits._
+
+    // The v2 writer's column-resolution layer requires every nested target 
field to be
+    // present in the microbatch DF. When run #2's source projection drops 
`b.c`, the merge
+    // fails with INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA. Users who want 
to drop a
+    // nested field between runs must full-refresh the target.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(key INT NOT NULL, version BIGINT NOT NULL, " +
+      s"value STRUCT<a:INT,b:STRUCT<c:INT,d:INT>>, $cdcMetadataDdl)"
+    )
+
+    // Stream is (key, version, a, b_c, b_d). Each run reshapes into different 
`value`
+    // shapes; the underlying tuple shape is unchanged so the streaming 
source's identity
+    // is stable across runs.
+    val stream = MemoryStream[(Int, Long, Int, Int, Int)]
+    def buildCtx(includeC: Boolean): TestGraphRegistrationContext =
+      new TestGraphRegistrationContext(spark) {
+        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+        val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
+        val inner = if (includeC) {
+          functions.struct(functions.col("b_c").as("c"), 
functions.col("b_d").as("d"))
+        } else {
+          functions.struct(functions.col("b_d").as("d"))
+        }
+        val projected = src.select(
+          functions.col("key"),
+          functions.col("version"),
+          functions.struct(functions.col("a"), inner.as("b")).as("value")
+        )
+        registerFlow(autoCdcFlow(
+          name = "auto_cdc_flow",
+          target = "target",
+          query = dfFlowFunc(projected),
+          keys = Seq("key"),
+          sequencing = functions.col("version")
+        ))
+      }
+
+    stream.addData((1, 1L, 1, 1, 1), (2, 1L, 2, 2, 2))
+    runPipeline(buildCtx(includeC = true))
+
+    // Run #2 drops b.c. The v2 writer rejects the merge because it cannot 
find data for
+    // the target's `value.b.c` column.
+    stream.addData((1, 2L, 10, 99, 10), (3, 1L, 3, 99, 3))
+    val ex = intercept[RuntimeException] { runPipeline(buildCtx(includeC = 
false)) }
+    // The V2 writer's `TableOutputResolver` produces this error during plan 
analysis with
+    // an empty `tableName` because the merge plan it analyzes does not carry 
the target's
+    // catalog identifier through to the resolver call site.
+    checkErrorInPipelineFailure(
+      failure = ex,
+      condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+      parameters = Map(
+        "tableName" -> "``",
+        "colName" -> "`value`.`b`.`c`"
+      )
+    )
+  }
+
+  test("a new field added inside an array<struct> element between runs is 
added to the " +
+    "target") {
+    val session = spark
+    import session.implicits._
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(key INT NOT NULL, version BIGINT NOT NULL, " +
+      s"vals ARRAY<STRUCT<a:INT,b:STRUCT<c:INT>>>, $cdcMetadataDdl)"
+    )
+
+    val stream = MemoryStream[(Int, Long, Int, Int, Int)]
+    def buildCtx(includeD: Boolean): TestGraphRegistrationContext =
+      new TestGraphRegistrationContext(spark) {
+        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+        val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
+        val inner = if (includeD) {
+          functions.struct(functions.col("b_c").as("c"), 
functions.col("b_d").as("d"))
+        } else {
+          functions.struct(functions.col("b_c").as("c"))
+        }
+        val projected = src.select(
+          functions.col("key"),
+          functions.col("version"),
+          functions.array(
+            functions.struct(functions.col("a"), inner.as("b"))
+          ).as("vals")
+        )
+        registerFlow(autoCdcFlow(
+          name = "auto_cdc_flow",
+          target = "target",
+          query = dfFlowFunc(projected),
+          keys = Seq("key"),
+          sequencing = functions.col("version")
+        ))
+      }
+
+    stream.addData((1, 1L, 1, 1, 99))
+    runPipeline(buildCtx(includeD = false))
+
+    // Run #2 widens to include b.d. Existing key=1 row's vals[0].b.d is NULL 
until the
+    // upsert at version=2 writes the new value.
+    stream.addData((1, 2L, 1, 1, 2), (3, 1L, 3, 3, 3))
+    runPipeline(buildCtx(includeD = true))
+
+    // Inline-explode flattens the array<struct> for assertion.
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target")
+        .selectExpr("key", "inline(vals) as (a, b)")
+        .select("key", "a", "b.c", "b.d"),
+      Seq(
+        Row(1, 1, 1, 2),
+        Row(3, 3, 3, 3)
+      )
+    )
+  }
+
+  test("dropping a field inside an array<struct> element between runs fails 
with " +
+    "INCOMPATIBLE_DATA_FOR_TABLE") {
+    val session = spark
+    import session.implicits._
+
+    // Symmetric to the nested-struct case, but for `array<struct>`. The v2 
writer rejects
+    // the merge because it cannot find data for the target's 
`vals.element.b.d` column
+    // when run #2's projection drops `d` from the element struct. Users must 
full-refresh
+    // the target to drop a nested array-element field.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(key INT NOT NULL, version BIGINT NOT NULL, " +
+      s"vals ARRAY<STRUCT<a:INT,b:STRUCT<c:INT,d:INT>>>, $cdcMetadataDdl)"
+    )
+
+    val stream = MemoryStream[(Int, Long, Int, Int, Int)]
+    def buildCtx(includeD: Boolean): TestGraphRegistrationContext =
+      new TestGraphRegistrationContext(spark) {
+        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+        val src = stream.toDF().toDF("key", "version", "a", "b_c", "b_d")
+        val inner = if (includeD) {
+          functions.struct(functions.col("b_c").as("c"), 
functions.col("b_d").as("d"))
+        } else {
+          functions.struct(functions.col("b_c").as("c"))
+        }
+        val projected = src.select(
+          functions.col("key"),
+          functions.col("version"),
+          functions.array(
+            functions.struct(functions.col("a"), inner.as("b"))
+          ).as("vals")
+        )
+        registerFlow(autoCdcFlow(
+          name = "auto_cdc_flow",
+          target = "target",
+          query = dfFlowFunc(projected),
+          keys = Seq("key"),
+          sequencing = functions.col("version")
+        ))
+      }
+
+    stream.addData((1, 1L, 1, 1, 1), (2, 1L, 2, 2, 2))
+    runPipeline(buildCtx(includeD = true))
+
+    stream.addData((1, 2L, 10, 10, 99), (3, 1L, 3, 3, 99))
+    val ex = intercept[RuntimeException] { runPipeline(buildCtx(includeD = 
false)) }
+    // See the nested-struct test above for why `tableName` is empty here.
+    checkErrorInPipelineFailure(
+      failure = ex,
+      condition = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+      parameters = Map(
+        "tableName" -> "``",
+        "colName" -> "`vals`.`element`.`b`.`d`"
+      )
+    )
+  }
+
+  test("a source DF column whose name differs from the target only by case 
fails with " +
+    "AMBIGUOUS_REFERENCE under case-insensitive resolution") {
+    val session = spark
+    import session.implicits._
+
+    // `DatasetManager`'s schema-merge compares the existing target schema and 
the flow's
+    // output schema *case-sensitively*: `SchemaMergingUtils.mergeSchemas` 
calls
+    // `StructType.merge` without forwarding the session-level 
case-sensitivity. When the
+    // target has `value` and the source DF emits `Value`, the merged schema 
ends up with
+    // both as separate columns. Reference resolution downstream is 
case-insensitive
+    // (Spark's default), so the MERGE plan trips on the duplicate and reports
+    // AMBIGUOUS_REFERENCE.
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      spark.sql(
+        s"CREATE TABLE $catalog.$namespace.target " +
+        s"(key INT NOT NULL, version BIGINT NOT NULL, value STRING, 
$cdcMetadataDdl)"
+      )
+
+      val stream = MemoryStream[(Int, Long, String)]
+      stream.addData((1, 1L, "alice"))
+      val ctx = new TestGraphRegistrationContext(spark) {
+        registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+        // Source DF emits `Value` (capital), differing only in case from the 
target's
+        // `value` column.
+        val df = stream.toDF().toDF("key", "version", "Value")
+        registerFlow(autoCdcFlow(
+          name = "auto_cdc_flow",
+          target = "target",
+          query = dfFlowFunc(df),
+          keys = Seq("key"),
+          sequencing = functions.col("version")
+        ))
+      }
+
+      val ex = intercept[RuntimeException] { runPipeline(ctx) }
+      // The exact `name` and `referenceNames` parameters depend on internal 
merge-plan
+      // synthesis; the condition match is the meaningful invariant for this 
test.
+      checkErrorInPipelineFailure(
+        failure = ex,
+        condition = "AMBIGUOUS_REFERENCE",
+        parameters = Map(
+          "name" -> ".*",
+          "referenceNames" -> ".*"
+        ),
+        matchPVals = true,
+        queryContext = Array(
+          ExpectedContext(
+            fragment = s"`$catalog`.`$namespace`.`target`.`Value`",
+            start = 0,
+            stop = 27
+          )
+        )
+      )
+    }
+  }
+
+  test("extra columns on the target that the AutoCDC flow does not emit are 
preserved " +
+    "across the merge") {
+    val session = spark
+    import session.implicits._
+
+    // The target is wider than the AutoCDC flow's source DF: column `extra` 
is present on
+    // the target but never produced by the flow. AutoCDC must tolerate the 
extra target
+    // column -- pre-existing rows keep their `extra` value, and 
newly-inserted rows
+    // resolve `extra` to NULL.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, extra INT, 
$cdcMetadataDdl)"
+    )
+    insertPreloadedRow(
+      s"$catalog.$namespace.target",
+      colValues = "1, 'preloaded', 0, 42",
+      sequence = 0L
+    )
+
+    val stream = MemoryStream[(Int, String, Long)]
+    stream.addData((1, "alice", 1L), (2, "bob", 1L))
+    val ctx = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx)
+
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target").select("id", "name", 
"version", "extra"),
+      Seq(
+        Row(1, "alice", 1L, 42), // extra preserved on the upsert
+        Row(2, "bob", 1L, null) // extra is NULL for inserts
+      )
+    )
+  }
+
+  test("changing a non-key column type from TIMESTAMP to STRING between runs 
fails with " +
+    "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE") {
+    val session = spark
+    import session.implicits._
+
+    // `mergeSchemas` rejects an incompatible type change between TIMESTAMP 
and STRING.
+    // Captured alongside the type-widening / type-narrowing tests; users must 
full-refresh
+    // the target to change a column's type.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(key INT NOT NULL, version BIGINT NOT NULL, value TIMESTAMP, 
$cdcMetadataDdl)"
+    )
+
+    val stream1 = MemoryStream[(Int, Long, Timestamp)]
+    stream1.addData((1, 1L, Timestamp.valueOf("2024-01-01 10:00:00")))
+    val ctx1 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream1.toDF().toDF("key", "version", "value")),
+        keys = Seq("key"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx1)
+
+    // Run #2 emits `value` as STRING. mergeSchemas rejects the type change.
+    val stream2 = MemoryStream[(Int, Long, String)]
+    stream2.addData((1, 2L, "2024-01-02 11:00:00"))
+    val ctx2 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream2.toDF().toDF("key", "version", "value")),
+        keys = Seq("key"),
+        sequencing = functions.col("version")
+      ))
+    }
+
+    val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+    checkErrorInPipelineFailure(
+      failure = ex,
+      condition = "CANNOT_MERGE_INCOMPATIBLE_DATA_TYPE",
+      sqlState = Some("42825"),
+      // `left` is the persisted (run #1) TIMESTAMP type; `right` is run #2's 
STRING.
+      parameters = Map(
+        "left" -> "\"TIMESTAMP\"",
+        "right" -> "\"STRING\""
+      )
+    )
+  }
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SinglePipelineSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SinglePipelineSuite.scala
new file mode 100644
index 000000000000..f06b8c461533
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SinglePipelineSuite.scala
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.functions
+import org.apache.spark.sql.pipelines.autocdc.{
+  ChangeArgs,
+  ColumnSelection,
+  ScdType,
+  UnqualifiedColumnName
+}
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Smoke tests for AutoCDC SCD type 1 flows running within a single pipeline: 
one
+ * [[DataflowGraph]] / [[TestPipelineUpdateContext]] executes one or more 
AutoCDC flows,
+ * and the target table contents are asserted at the end. Multi-pipeline 
scenarios (where
+ * multiple pipelines write to the same target) live in 
[[AutoCdcScd1MultiPipelineSuite]].
+ */
+class AutoCdcScd1SinglePipelineSuite
+    extends ExecutionTest
+    with SharedSparkSession
+    with AutoCdcGraphExecutionTestMixin {
+
+  test("an upsert event lands a new row in an empty target table") {
+    val session = spark
+    import session.implicits._
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    val stream = MemoryStream[(Int, String, Long)]
+    stream.addData((1, "alice", 1L))
+
+    val ctx = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+
+    runPipeline(ctx)
+
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L))))
+    )
+  }
+
+  test("consecutive upsert, delete, and re-upsert events for the same key in 
one run " +
+    "converge to the latest event") {
+    val session = spark
+    import session.implicits._
+
+    // Target schema deliberately omits `is_delete`: the source carries it as 
a control
+    // column, drives the deleteCondition, and is excluded from the target 
projection.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    val stream = MemoryStream[(Int, String, Long, Boolean)]
+    stream.addData(
+      (1, "alice", 1L, false), // initial upsert
+      (1, "alice", 2L, true),  // delete
+      (1, "alice2", 3L, false) // reinsert
+    )
+
+    val ctx = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream.toDF().toDF("id", "name", "version", 
"is_delete")),
+        keys = Seq("id"),
+        sequencing = functions.col("version"),
+        deleteCondition = Some(functions.col("is_delete") === true),
+        columnSelection = Some(ColumnSelection.ExcludeColumns(
+          Seq(UnqualifiedColumnName("is_delete"))
+        ))
+      ))
+    }
+
+    runPipeline(ctx)
+
+    // After all three events at seqs 1, 2, 3: row "alice2" wins as the 
highest-sequenced
+    // upsert; the delete at seq=2 is superseded by the seq=3 upsert.
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(Row(1, "alice2", 3L, cdcMeta(None, Some(3L))))
+    )
+  }
+
+  test("two AutoCDC flows targeting separate tables in one pipeline produce 
independent " +
+    "results") {
+    val session = spark
+    import session.implicits._
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.t_a " +
+      s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.t_b " +
+      s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+
+    val streamA = MemoryStream[(Int, Long)]
+    val streamB = MemoryStream[(Int, Long)]
+    streamA.addData((1, 1L), (2, 1L))
+    streamB.addData((10, 1L))
+
+    val ctx = new TestGraphRegistrationContext(spark) {
+      registerTable("t_a", catalog = Some(catalog), database = Some(namespace))
+      registerTable("t_b", catalog = Some(catalog), database = Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_a",
+        target = "t_a",
+        query = dfFlowFunc(streamA.toDF().toDF("id", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+      registerFlow(autoCdcFlow(
+        name = "flow_b",
+        target = "t_b",
+        query = dfFlowFunc(streamB.toDF().toDF("id", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx)
+
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.t_a"),
+      Seq(Row(1, 1L, cdcMeta(None, Some(1L))), Row(2, 1L, cdcMeta(None, 
Some(1L))))
+    )
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.t_b"),
+      Seq(Row(10, 1L, cdcMeta(None, Some(1L))))
+    )
+    assert(spark.catalog.tableExists(auxTableNameFor("t_a")))
+    assert(spark.catalog.tableExists(auxTableNameFor("t_b")))
+  }
+
+  test("an AutoCDC flow targeting a table whose format does not support 
row-level " +
+    "operations fails with AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE") {
+    val session = spark
+    import session.implicits._
+
+    // Intentionally use a non-merge-compatible catalog, whose default table 
format is parquet.
+    val catalog = TestGraphRegistrationContext.DEFAULT_CATALOG
+    val database = TestGraphRegistrationContext.DEFAULT_DATABASE
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$database.target_no_merge " +
+      s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+
+    val stream = MemoryStream[(Int, Long)]
+    stream.addData((1, 1L))
+
+    val ctx = new TestGraphRegistrationContext(spark) {
+      registerTable("target_no_merge")
+      registerFlow(AutoCdcFlow(
+        identifier = fullyQualifiedIdentifier("auto_cdc_flow"),
+        destinationIdentifier = fullyQualifiedIdentifier("target_no_merge"),
+        func = dfFlowFunc(stream.toDF().toDF("id", "version")),
+        queryContext = QueryContext(
+          currentCatalog = Some(catalog),
+          currentDatabase = Some(database)
+        ),
+        origin = QueryOrigin.empty,
+        changeArgs = ChangeArgs(
+          keys = Seq(UnqualifiedColumnName("id")),
+          sequencing = functions.col("version"),
+          storedAsScdType = ScdType.Type1
+        )
+      ))
+    }
+
+    val ex = intercept[RuntimeException] { runPipeline(ctx) }
+    checkErrorInPipelineFailure(
+      failure = ex,
+      condition = "AUTOCDC_TARGET_DOES_NOT_SUPPORT_MERGE",
+      sqlState = Some("0A000"),
+      parameters = Map(
+        "tableName" -> s"`$catalog`.`$database`.`target_no_merge`",
+        "format" -> "parquet"
+      )
+    )
+  }
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala
new file mode 100644
index 000000000000..46f8ee47db02
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.functions
+import org.apache.spark.sql.pipelines.autocdc.Scd1BatchProcessor
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests covering AutoCDC's behavior when the target table is pre-populated by 
something
+ * other than a prior AutoCDC run: pre-loaded rows, missing CDC metadata 
column on the
+ * target, and rows with NULL CDC metadata. These cases verify that AutoCDC 
interoperates
+ * gracefully with users who hand-populate the target table.
+ */
+class AutoCdcScd1TargetTableDurabilitySuite
+    extends ExecutionTest
+    with SharedSparkSession
+    with AutoCdcGraphExecutionTestMixin {
+
+  test("pre-loaded rows: an event with a lower sequence is suppressed and a 
higher one " +
+    "wins") {
+    val session = spark
+    import session.implicits._
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+    insertPreloadedRow(s"$catalog.$namespace.target", "1, 'alice', 5", 5L)
+    insertPreloadedRow(s"$catalog.$namespace.target", "2, 'bob', 5", 5L)
+
+    val stream = MemoryStream[(Int, String, Long)]
+    stream.addData(
+      (1, "stale", 2L),  // < pre-existing seq=5 -> ignored
+      (2, "bob2", 10L)   // > pre-existing seq=5 -> upserts
+    )
+    val ctx = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx)
+
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(
+        Row(1, "alice", 5L, cdcMeta(None, Some(5L))),
+        Row(2, "bob2", 10L, cdcMeta(None, Some(10L)))
+      )
+    )
+  }
+
+  test("pre-loaded target rows merge correctly on the first AutoCDC run, and 
the " +
+    "auxiliary table is created lazily") {
+    val session = spark
+    import session.implicits._
+
+    // Target was populated by some external process; this is the first 
AutoCDC run.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+    insertPreloadedRow(s"$catalog.$namespace.target", "1, 'alice', 1", 1L)
+
+    assert(
+      !spark.catalog.tableExists(auxTableNameFor("target")),
+      "Auxiliary table should not exist before the first AutoCDC run"
+    )
+
+    val stream = MemoryStream[(Int, String, Long)]
+    stream.addData((1, "bob", 2L))
+
+    val ctx = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx)
+
+    // seq=2 > pre-existing seq=1, so "bob" replaces "alice" via the upsert 
sequence column.
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(Row(1, "bob", 2L, cdcMeta(None, Some(2L))))
+    )
+    assert(
+      spark.catalog.tableExists(auxTableNameFor("target")),
+      "Auxiliary table should be created lazily on the first AutoCDC run"
+    )
+  }
+
+  test("a target table created without the CDC metadata column gets the column 
" +
+    "auto-added on the first AutoCDC run") {
+    val session = spark
+    import session.implicits._
+
+    // User creates the target without the AutoCDC metadata column. 
DatasetManager evolves
+    // the existing table schema by merging it with the AutoCdcMergeFlow's 
output schema,
+    // which includes the metadata column. The first run therefore proceeds 
normally, and
+    // subsequent reads see the metadata struct alongside the user's data 
columns.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, name STRING, version BIGINT NOT NULL)"
+    )
+
+    val stream = MemoryStream[(Int, String, Long)]
+    stream.addData((1, "alice", 1L))
+
+    val ctx = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(stream.toDF().toDF("id", "name", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx)
+
+    val schema = spark.table(s"$catalog.$namespace.target").schema
+    assert(
+      schema.fieldNames.contains(Scd1BatchProcessor.cdcMetadataColName),
+      s"Target must have ${Scd1BatchProcessor.cdcMetadataColName} after first 
AutoCDC run; " +
+      s"got ${schema.fieldNames.toSeq}"
+    )
+    checkAnswer(
+      spark.table(s"$catalog.$namespace.target"),
+      Seq(Row(1, "alice", 1L, cdcMeta(None, Some(1L))))
+    )
+  }
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
index f19fed4e5780..8dad5019c0fe 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/ConnectInvalidPipelineSuite.scala
@@ -660,7 +660,7 @@ class ConnectInvalidPipelineSuite extends PipelineTest with 
SharedSparkSession {
   ) {
     // Temporary views in SDP normally accept either streaming or batch 
producing flows, but
     // AutoCDC flows are an explicit exception: SCD reconciliation only runs 
at the
-    // streaming-table sink (`Scd1ForeachBatchExec`), so pointing an AutoCDC 
flow at a view
+    // streaming-table sink (`Scd1ForeachBatchHandler`), so pointing an 
AutoCDC flow at a view
     // would silently drop reconciliation and expose just the projected CDF to 
consumers.
     // `validateFlowStreamingness` rejects this case with a dedicated 
sub-condition under
     // INVALID_FLOW_QUERY_TYPE.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to