This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 04190d01836d [SPARK-57113][SDP] Prevent AutoCDC keys from changing 
across SDP runs
04190d01836d is described below

commit 04190d01836d300f43ec4696f3f6a1e9b87cb4b8
Author: AnishMahto <[email protected]>
AuthorDate: Thu May 28 10:39:14 2026 -0700

    [SPARK-57113][SDP] Prevent AutoCDC keys from changing across SDP runs
    
    ### What changes were proposed in this pull request?
    
    **Problem**
    An AutoCDC flow's key columns determine how rows are matched during the 
merge into the target. If a user changes the declared keys between pipeline 
runs without a full refresh, the merge silently mis-routes rows (updates become 
inserts and vice versa) and corrupts the target. We need to detect a key change 
between runs and fail fast.
    
    **Proposed Solution**
    Record the resolved key columns as a JSON list into a reserved property on 
the auxiliary table the first time it is created; on subsequent runs, validate 
the flow's declared keys against this recorded value and fail with 
`AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT` on mismatch.
    
    **Implementation notes**
    Aux-table creation switched from `CREATE TABLE` DDL to the programmatic V2 
`TableCatalog.createTable` API, so the JSON property reaches storage as raw 
bytes (no SQL-literal escape layer). Three new structured sub-classes under 
`AUTOCDC_INVALID_STATE` cover corrupted-metadata cases caused by user 
tampering, all recommending full-refresh as remedy.
    
    ### Why are the changes needed?
    To prevent users from incorrectly changing AutoCDC keys across runs without 
a full refresh.
    
    ### Does this PR introduce _any_ user-facing change?
    No, unreleased change.
    
    ### How was this patch tested?
    Unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Claude Opus 4.7
    
    Closes #56160 from AnishMahto/prevent-autocdc-key-drift-v2.
    
    Authored-by: AnishMahto <[email protected]>
    Signed-off-by: DB Tsai <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  28 +
 .../spark/sql/pipelines/graph/FlowExecution.scala  | 263 ++++++++--
 .../sql/pipelines/util/PipelinesCatalogUtils.scala |  52 ++
 .../graph/AutoCdcAuxiliaryTableSuite.scala         | 100 ++++
 .../graph/AutoCdcGraphExecutionTestMixin.scala     |  10 +-
 .../AutoCdcScd1AuxiliaryTableDurabilitySuite.scala |  90 +++-
 .../pipelines/graph/AutoCdcScd1KeyDriftSuite.scala | 566 +++++++++++++++++++++
 .../graph/AutoCdcScd1MultiPipelineSuite.scala      |  61 +++
 .../graph/AutoCdcScd1SchemaEvolutionSuite.scala    |   9 +-
 9 files changed, 1130 insertions(+), 49 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 71dbf1a1ebce..4a2591d548ed 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -209,6 +209,34 @@
     ],
     "sqlState" : "22023"
   },
+  "AUTOCDC_INVALID_STATE" : {
+    "message" : [
+      "AutoCDC flow <flowName> detected an invalid state:"
+    ],
+    "subClass" : {
+      "AUXILIARY_TABLE_KEY_COLUMN_MISSING" : {
+        "message" : [
+          "The auxiliary table <auxTableName> is missing key column 
<keyColumnName> that is recorded in its <propertyName> table property. The 
auxiliary table schema may be corrupted or have been modified externally. 
Perform a full refresh of the target table to recreate the auxiliary table."
+        ]
+      },
+      "AUXILIARY_TABLE_PROPERTY_MALFORMED" : {
+        "message" : [
+          "The auxiliary table <auxTableName> has a malformed <propertyName> 
property with raw value '<rawValue>'. The property must be a JSON array of 
strings (e.g. '[\"id\",\"region\"]'). The auxiliary table metadata may be 
corrupted or have been modified externally. Perform a full refresh of the 
target table to recreate the auxiliary table."
+        ]
+      },
+      "AUXILIARY_TABLE_PROPERTY_MISSING" : {
+        "message" : [
+          "The auxiliary table <auxTableName> is missing the required 
<propertyName> table property; cannot validate AutoCDC key columns. The 
auxiliary table metadata may be corrupted or have been modified externally. 
Perform a full refresh of the target table to recreate the auxiliary table."
+        ]
+      },
+      "KEY_SCHEMA_DRIFT" : {
+        "message" : [
+          "The AutoCDC flow's current key columns <expectedKeySchema> do not 
match the keys recorded in the auxiliary table <auxTableName> (recorded keys 
<recordedKeySchema>). AutoCDC does not support changing key columns or their 
types across incremental pipeline runs. To change keys, perform a full refresh 
of the target table."
+        ]
+      }
+    },
+    "sqlState" : "42000"
+  },
   "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA" : {
     "message" : [
       "Using <caseSensitivity> column name comparison, the AutoCDC key column 
`<keyColumnName>` is not present in the flow's selected source schema. AutoCDC 
requires every key column to be present in the source change-data feed and 
retained by any configured column selection."
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
index ea151830f544..0d1c33be2172 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.ThreadPoolExecutor
 import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.concurrent.{ExecutionContext, Future}
+import scala.jdk.CollectionConverters._
 import scala.util.control.NonFatal
 
 import org.apache.spark.SparkException
@@ -29,8 +30,7 @@ import org.apache.spark.sql.{AnalysisException, Dataset, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.classic.ClassicConversions._
 import org.apache.spark.sql.classic.SparkSession
-import org.apache.spark.sql.connector.catalog.{Identifier, 
SupportsRowLevelOperations, TableCatalog}
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, 
SupportsRowLevelOperations, Table => CatalogTable, TableCatalog, TableInfo}
 import org.apache.spark.sql.pipelines.autocdc.{
   AutoCdcReservedNames,
   ChangeArgs,
@@ -38,7 +38,7 @@ import org.apache.spark.sql.pipelines.autocdc.{
   Scd1ForeachBatchHandler
 }
 import org.apache.spark.sql.pipelines.graph.QueryOrigin.ExceptionHelpers
-import org.apache.spark.sql.pipelines.util.SparkSessionUtils
+import org.apache.spark.sql.pipelines.util.{PipelinesCatalogUtils, 
SparkSessionUtils}
 import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.util.ThreadUtils
@@ -330,11 +330,51 @@ object AutoCdcAuxiliaryTable {
    * Reserved table property key set on the auxiliary table to record which 
SCD strategy it
    * serves.
    */
-  val scdTypePropertyKey: String = 
s"${PipelinesTableProperties.pipelinesPrefix}autocdc.scd_type"
+  val scdTypePropertyKey: String = 
s"${PipelinesTableProperties.pipelinesPrefix}autocdc.scdType"
+
+  /**
+   * Table property recording the auxiliary table's unquoted AutoCDC key 
column names as a JSON
+   * string array (e.g. `["id","region"]`). Written once when the auxiliary 
table is created and is
+   * considered immutable; full-refresh is the only way to change it.
+   */
+  val keyColumnNamesProperty: String =
+    s"${PipelinesTableProperties.pipelinesPrefix}autocdc.keyColumnNames"
+
+  /**
+   * Serialize key column names to the JSON form stored at 
[[keyColumnNamesProperty]].
+   * Round-trips an empty list as `[]`; callers are expected to enforce a 
non-empty key set
+   * upstream.
+   */
+  def serializeKeyColumnNames(names: Seq[String]): String = {
+    import org.json4s.JsonAST.{JArray, JString}
+    import org.json4s.jackson.JsonMethods.compact
+    compact(JArray(names.map(JString(_)).toList))
+  }
+
+  /**
+   * Parse a [[keyColumnNamesProperty]] value. `None` if it is not a JSON 
array of strings.
+   * Round-trips an empty list as `[]`; callers are expected to enforce a 
non-empty key set
+   * upstream.
+   */
+  def parseKeyColumnNames(raw: String): Option[Seq[String]] = {
+    import org.json4s.JsonAST.{JArray, JString}
+    import org.json4s.jackson.JsonMethods.parse
+    val parsed = try Some(parse(raw)) catch { case NonFatal(_) => None }
+    parsed.flatMap {
+      case JArray(elems) =>
+        val names = elems.collect { case JString(s) => s }
+        if (names.size == elems.size) Some(names) else None
+      case _ => None
+    }
+  }
 }
 
 /**
  * Base trait for AutoCDC merge-based write flows.
+ *
+ * Today, this trait and its children manage auxiliary table creation and 
validation across
+ * pipeline executions. Eventually we should evolve DatasetManager to be aware 
of the concept of
+ * auxiliary tables, and streamline creation/validation there.
  */
 trait AutoCdcMergeWriteBase {
   /** The spark session the AutoCDC flow is going to be planned in. */
@@ -343,6 +383,9 @@ trait AutoCdcMergeWriteBase {
   /** The destination (target) table entity the AutoCDC flow will be writing 
to. */
   protected def destination: Table
 
+  /** The AutoCDC flow's identifier, used as `flowName` in error messages 
emitted by this mixin. */
+  protected def identifier: TableIdentifier
+
   /** The AutoCDC flow's [[ChangeArgs]] (keys, sequencing, columnSelection, 
...). */
   protected def changeArgs: ChangeArgs
 
@@ -350,37 +393,82 @@ trait AutoCdcMergeWriteBase {
   protected def auxiliaryTableSchema: StructType
 
   /**
-   * Idempotently create the auxiliary table for [[destination]] if it does 
not already exist
-   * and return its [[TableIdentifier]].
+   * Create the auxiliary table for [[destination]] if it does not already 
exist and return its
+   * [[TableIdentifier]].
    *
-   * Note that this is `CREATE TABLE IF NOT EXISTS`: when the aux table 
already exists, its
-   * schema is left untouched and `auxiliaryTableSchema` is ignored. For SCD1, 
they keys must be
-   * invariant across executions and the CDC metadata will always be present, 
so this is correct.
+   * When the aux table already exists, its schema and properties are left 
untouched. For SCD1
+   * the keys must be invariant across executions and the CDC metadata is 
always present, so
+   * this is correct; drift validation reads the recorded 
`keyColumnNamesProperty` to enforce
+   * the invariant before this method is called.
    */
   protected def createAuxiliaryTableIfNotExists(spark: SparkSession): 
TableIdentifier = {
     val auxIdent = AutoCdcAuxiliaryTable.identifier(destination.identifier)
-    // The auxiliary table inherits the target's format so MERGE semantics 
line up. When the
-    // target's format is unspecified (None), omit the USING clause and fall 
back to the
-    // session's default source provider.
-    val usingClause = destination.format.map(fmt => s"USING 
$fmt").getOrElse("")
-    val tblPropertiesClause =
-      s"TBLPROPERTIES ('${AutoCdcAuxiliaryTable.scdTypePropertyKey}' = " +
-        s"'${changeArgs.storedAsScdType.label}')"
-    spark.sql(
-      s"""CREATE TABLE IF NOT EXISTS
-         |${auxIdent.quotedString}
-         |(${auxiliaryTableSchema.toDDL}) $usingClause 
$tblPropertiesClause""".stripMargin
-    )
+    val (catalog, v2Identifier) = 
PipelinesCatalogUtils.resolveTableCatalog(spark, auxIdent)
+
+    if (!catalog.tableExists(v2Identifier)) {
+      val properties = scala.collection.mutable.Map.empty[String, String]
+
+      // Inherit the target's format so MERGE semantics line up. When 
unspecified, omit the
+      // provider so the catalog falls back to its default.
+      destination.format.foreach { fmt => 
properties(TableCatalog.PROP_PROVIDER) = fmt }
+
+      // Record which SCD strategy this auxiliary table serves so downstream 
readers can
+      // identify it without having to inspect the schema.
+      properties(AutoCdcAuxiliaryTable.scdTypePropertyKey) = 
changeArgs.storedAsScdType.label
+
+      // Persist the AutoCDC key column names as a JSON list on first 
creation. The value
+      // is stored verbatim by the catalog.
+      properties(AutoCdcAuxiliaryTable.keyColumnNamesProperty) =
+        AutoCdcAuxiliaryTable.serializeKeyColumnNames(auxiliaryKeyColumnNames)
+
+      // Table creation is not atomic with the table exists check, and 
[[createTable]] will fail
+      // with TableAlreadyExistsException if some asynchronous process creates 
the table between
+      // the [[tableExists]] check and [[createTable]]. This is both rare (we 
don't support
+      // multi-AutoCDC-flow targets so there are no race conditions within a 
single pipeline) and
+      // acceptable - users can cleanly retry the failed flow when this 
happens. SQL offers an
+      // atomic CREATE IF NOT EXISTS, but would require special casing of the 
table properties
+      // in DDL and we would lose compile-time syntax and type safety.
+      catalog.createTable(
+        v2Identifier,
+        new TableInfo.Builder()
+          
.withColumns(CatalogV2Util.structTypeToV2Columns(auxiliaryTableSchema))
+          .withProperties(properties.asJava)
+          .build()
+      )
+    }
     auxIdent
   }
 
+  /**
+   * Returns the resolved AutoCDC key column names as they appear in the 
auxiliary schema, in
+   * `changeArgs.keys` declaration order.
+   */
+  private def auxiliaryKeyColumnNames: Seq[String] = {
+    val resolver = spark.sessionState.conf.resolver
+    changeArgs.keys.map { key =>
+      auxiliaryTableSchema.fields
+        .find(field => resolver(field.name, key.name))
+        .map(_.name)
+        .getOrElse(
+          // This should never happen at this point, as [[AutoCdcMergeFlow]] 
should have validated
+          // all changeArgs.keys exist in the deduced aux/target table schemas 
by now.
+          throw SparkException.internalError(
+            s"AutoCDC key column '${key.name}' is missing from the auxiliary 
table schema " +
+            s"for flow ${identifier.unquotedString} writing to target " +
+            s"${destination.identifier.quotedString}."
+          )
+        )
+    }
+  }
+
   /**
    * Validate that the target table's underlying connector implements
    * [[SupportsRowLevelOperations]], which is the V2 connector contract for 
MERGE/UPDATE/DELETE
    * with rewrite - all operations that the AutoCDC transformation executes.
    */
   protected def requireDestinationSupportsRowLevelOps(): Unit = {
-    val (catalog, v2Identifier) = resolveTableCatalog(spark, 
destination.identifier)
+    val (catalog, v2Identifier) =
+      PipelinesCatalogUtils.resolveTableCatalog(spark, destination.identifier)
     val destinationTable = catalog.loadTable(v2Identifier)
 
     if (!destinationTable.isInstanceOf[SupportsRowLevelOperations]) {
@@ -399,23 +487,123 @@ trait AutoCdcMergeWriteBase {
     }
   }
 
-  private def resolveTableCatalog(
-      spark: SparkSession,
-      ident: TableIdentifier): (TableCatalog, Identifier) = {
-    val catalogManager = spark.sessionState.catalogManager
-    val catalogPlugin = ident.catalog
-      .map(catalogManager.catalog)
-      .getOrElse(catalogManager.currentCatalog)
-    val catalog = catalogPlugin match {
-      case t: TableCatalog => t
-      case _ => throw 
QueryCompilationErrors.missingCatalogTablesAbilityError(catalogPlugin)
+  /**
+   * If the auxiliary table for this flow's destination already exists, 
validate that the
+   * AutoCDC keys the flow expects line up with the keys recorded in the 
auxiliary
+   * table. On a fresh pipeline (or after a full refresh dropped the 
auxiliary), the
+   * auxiliary is absent and there's nothing to drift from, so this is a no-op.
+   */
+  protected def validateNoAutoCdcKeyDriftIfAuxTableExists(): Unit = {
+    val auxIdent = AutoCdcAuxiliaryTable.identifier(destination.identifier)
+    val (catalog, v2Identifier) = 
PipelinesCatalogUtils.resolveTableCatalog(spark, auxIdent)
+    if (catalog.tableExists(v2Identifier)) {
+      validateNoAutoCdcKeyDrift(catalog.loadTable(v2Identifier), auxIdent)
+    }
+  }
+
+  /**
+   * Validate that the AutoCDC key columns the flow expects match the keys 
recorded in the
+   * existing auxiliary table at [[auxIdent]] as a set: same arity, same set 
of names (per the
+   * session resolver), same per-name `dataType`s.
+   */
+  private def validateNoAutoCdcKeyDrift(
+      existingAuxTable: CatalogTable,
+      auxIdent: TableIdentifier): Unit = {
+    val resolver = spark.sessionState.conf.resolver
+    val existingAuxSchema = 
CatalogV2Util.v2ColumnsToStructType(existingAuxTable.columns())
+
+    // The expected key fields are looked up in [[auxiliaryTableSchema]], 
which by construction
+    // contains every key column with its source-derived dataType. We 
deliberately do not look
+    // them up in [[existingAuxSchema]] - that's the recorded side, and 
conflating the two
+    // sides would mask drift.
+    val expectedKeyFields: Seq[StructField] = changeArgs.keys.map { key =>
+      auxiliaryTableSchema.fields
+        .find(field => resolver(field.name, key.name))
+        .getOrElse(
+          // Construction of [[auxiliaryTableSchema]] already enforces all of 
the user-specified
+          // keys are present, so if we don't find a key it is truly an 
internal error.
+          throw SparkException.internalError(
+            s"Key column '${key.name}' was not found in the AutoCDC auxiliary 
table schema."
+          )
+        )
+    }
+    val recordedKeyNames = parseRecordedKeyColumnNames(existingAuxTable, 
auxIdent)
+    val recordedKeyFields: Seq[StructField] = recordedKeyNames.map { name =>
+      existingAuxSchema.fields
+        .find(field => resolver(field.name, name))
+        .getOrElse(
+          // Either an implementation bug or, more likely, the user has 
corrupted the auxiliary
+          // table schema (e.g. dropped the key column). The remedy is 
full-refresh in either
+          // case.
+          throw new AnalysisException(
+            errorClass = 
"AUTOCDC_INVALID_STATE.AUXILIARY_TABLE_KEY_COLUMN_MISSING",
+            messageParameters = Map(
+              "flowName" -> identifier.unquotedString,
+              "auxTableName" -> auxIdent.unquotedString,
+              "keyColumnName" -> name,
+              "propertyName" -> AutoCdcAuxiliaryTable.keyColumnNamesProperty
+            )
+          )
+        )
+    }
+
+    val drifted =
+      // Arity drift (added or dropped keys).
+      recordedKeyFields.length != expectedKeyFields.length ||
+      // Name or dataType drift: every expected key must have a same-name 
(resolver-aware)
+      // recorded counterpart with an equivalent dataType. Columns changing 
nullability and
+      // metadata in the schema are intentionally tolerated, although null key 
values during
+      // microbatch execution will be invalidated regardless.
+      expectedKeyFields.exists { expected =>
+        recordedKeyFields.find(rf => resolver(rf.name, expected.name)) match {
+          case None => true
+          case Some(recorded) => !recorded.dataType.sameType(expected.dataType)
+        }
+      }
+
+    if (drifted) {
+      throw new AnalysisException(
+        errorClass = "AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT",
+        messageParameters = Map(
+          "flowName" -> identifier.unquotedString,
+          "auxTableName" -> auxIdent.unquotedString,
+          "expectedKeySchema" -> StructType(expectedKeyFields).toDDL,
+          "recordedKeySchema" -> StructType(recordedKeyFields).toDDL
+        )
+      )
+    }
+  }
+
+  /**
+   * Read the [[AutoCdcAuxiliaryTable.keyColumnNamesProperty]] off an existing 
auxiliary table
+   * and parse it into the ordered list of recorded AutoCDC key column names.
+   */
+  private def parseRecordedKeyColumnNames(
+      existingAuxTable: CatalogTable,
+      auxIdent: TableIdentifier): Seq[String] = {
+    val rawKeyColumnNamesStr = Option(
+      
existingAuxTable.properties().get(AutoCdcAuxiliaryTable.keyColumnNamesProperty)
+    ).getOrElse {
+      throw new AnalysisException(
+        errorClass = "AUTOCDC_INVALID_STATE.AUXILIARY_TABLE_PROPERTY_MISSING",
+        messageParameters = Map(
+          "flowName" -> identifier.unquotedString,
+          "auxTableName" -> auxIdent.unquotedString,
+          "propertyName" -> AutoCdcAuxiliaryTable.keyColumnNamesProperty
+        )
+      )
     }
-    val namespace = ident.database.getOrElse(
-      throw SparkException.internalError(
-        s"Cannot resolve table identifier ${ident.quotedString}: namespace is 
unspecified."
+    AutoCdcAuxiliaryTable.parseKeyColumnNames(rawKeyColumnNamesStr).getOrElse {
+      throw new AnalysisException(
+        errorClass = 
"AUTOCDC_INVALID_STATE.AUXILIARY_TABLE_PROPERTY_MALFORMED",
+        messageParameters = Map(
+          "flowName" -> identifier.unquotedString,
+          "auxTableName" -> auxIdent.unquotedString,
+          "propertyName" -> AutoCdcAuxiliaryTable.keyColumnNamesProperty,
+          "rawValue" -> rawKeyColumnNamesStr
+        )
       )
-    )
-    (catalog, Identifier.of(Array(namespace), ident.table))
+    }
   }
 }
 
@@ -435,6 +623,7 @@ class Scd1MergeStreamingWrite(
 ) extends StreamingFlowExecution with AutoCdcMergeWriteBase {
 
   requireDestinationSupportsRowLevelOps()
+  validateNoAutoCdcKeyDriftIfAuxTableExists()
 
   override def getOrigin: QueryOrigin = flow.origin
 
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/util/PipelinesCatalogUtils.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/util/PipelinesCatalogUtils.scala
new file mode 100644
index 000000000000..8df9f128a25d
--- /dev/null
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/util/PipelinesCatalogUtils.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.util
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+
+/** Catalog-resolution helpers shared across the pipelines module. */
+object PipelinesCatalogUtils {
+
+  /**
+   * Resolve a v1 [[TableIdentifier]] to a `(TableCatalog, Identifier)` pair 
usable against the
+   * v2 connector APIs. If `ident.catalog` is unset, falls back to the 
session's
+   * `currentCatalog`.
+   */
+  def resolveTableCatalog(
+      spark: SparkSession,
+      ident: TableIdentifier): (TableCatalog, Identifier) = {
+    val catalogManager = spark.sessionState.catalogManager
+    val catalogPlugin = ident.catalog
+      .map(catalogManager.catalog)
+      .getOrElse(catalogManager.currentCatalog)
+    val catalog = catalogPlugin match {
+      case t: TableCatalog => t
+      case _ => throw 
QueryCompilationErrors.missingCatalogTablesAbilityError(catalogPlugin)
+    }
+    val namespace = ident.database.getOrElse(
+      throw SparkException.internalError(
+        s"Cannot resolve table identifier ${ident.quotedString}: namespace is 
unspecified."
+      )
+    )
+    (catalog, Identifier.of(Array(namespace), ident.table))
+  }
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcAuxiliaryTableSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcAuxiliaryTableSuite.scala
new file mode 100644
index 000000000000..9fb6070c01e7
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcAuxiliaryTableSuite.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.SparkFunSuite
+
+/**
+ * Unit tests for the [[AutoCdcAuxiliaryTable]] companion object, in 
particular the
+ * `serializeKeyColumnNames` / `parseKeyColumnNames` round-trip helpers used 
to persist the
+ * AutoCDC key column names as a JSON-encoded reserved table property on the 
auxiliary table.
+ *
+ * These tests are intentionally session-less: the helpers are pure functions 
on `String` and
+ * `Seq[String]`, and verifying their byte-for-byte round-trip contract 
requires no Spark
+ * runtime. End-to-end persistence (DDL -> catalog -> SHOW TBLPROPERTIES) is 
covered by
+ * `AutoCdcScd1AuxiliaryTableDurabilitySuite`; drift-validator behavior over 
the parsed
+ * property is covered by `AutoCdcScd1KeyDriftSuite`.
+ */
+class AutoCdcAuxiliaryTableSuite extends SparkFunSuite {
+
+  // The drift validator stores key column names in a table property as a JSON 
array of strings.
+  // These round-trip tests verify that identifier text is preserved verbatim 
through
+  // serialize -> parse, including characters that JSON itself must escape 
(`"`, `\`, control
+  // chars) and characters that JSON does not touch but that downstream 
interpolation might
+  // (`'`, ` `, `.`, backtick). Storage at the table property level is solely 
the JSON layer's
+  // concern -- SQL identifier quoting (backticks) is never part of the stored 
bytes.
+
+  private def assertKeyColumnNamesRoundTrip(names: Seq[String]): Unit = {
+    val json = AutoCdcAuxiliaryTable.serializeKeyColumnNames(names)
+    assert(
+      AutoCdcAuxiliaryTable.parseKeyColumnNames(json).contains(names),
+      s"round-trip failed: input=${names}, serialized=${json}"
+    )
+  }
+
+  test("serializeKeyColumnNames/parseKeyColumnNames round-trip preserves plain 
ASCII names") {
+    assertKeyColumnNamesRoundTrip(Seq("id"))
+    assertKeyColumnNamesRoundTrip(Seq("id", "region"))
+    assertKeyColumnNamesRoundTrip(Seq("id", "region", "country"))
+  }
+
+  test("serializeKeyColumnNames/parseKeyColumnNames round-trip preserves the 
empty list") {
+    // Empty key sets are not user-reachable (AutoCdcMergeFlow rejects them 
upstream), but the
+    // helpers themselves must round-trip a `[]` JSON array faithfully.
+    assertKeyColumnNamesRoundTrip(Seq.empty)
+  }
+
+  test("serializeKeyColumnNames/parseKeyColumnNames preserves names containing 
JSON-escaped " +
+    "characters (quote, backslash, control chars)") {
+    // JSON serializer must escape `"` -> `\"`, `\` -> `\\`, and control 
chars; the parser
+    // must invert those escapes and yield the original literal bytes.
+    assertKeyColumnNamesRoundTrip(Seq("a\"b"))
+    assertKeyColumnNamesRoundTrip(Seq("a\\b"))
+    assertKeyColumnNamesRoundTrip(Seq("a\nb"))
+    assertKeyColumnNamesRoundTrip(Seq("a\tb"))
+    // Mixed: every JSON-escaped class in a single name.
+    assertKeyColumnNamesRoundTrip(Seq("a\"b\\c\nd"))
+  }
+
+  test("serializeKeyColumnNames/parseKeyColumnNames preserves names containing 
characters " +
+    "that JSON does not escape (single quote, dot, space, backtick)") {
+    // JSON does not escape these, but they are common in real-world 
identifiers (especially
+    // when users backtick-quote at the API boundary). They must flow through 
verbatim.
+    assertKeyColumnNamesRoundTrip(Seq("it's"))
+    assertKeyColumnNamesRoundTrip(Seq("a.b"))
+    assertKeyColumnNamesRoundTrip(Seq("name with spaces"))
+    assertKeyColumnNamesRoundTrip(Seq("a`b"))
+    // Mixed: a single composite key whose pieces collectively touch every 
"passes verbatim"
+    // class.
+    assertKeyColumnNamesRoundTrip(Seq("it's", "name with spaces", "a.b.c", 
"back`tick"))
+  }
+
+  test("parseKeyColumnNames returns None for inputs that are not a JSON array 
of strings") {
+    // None of these are a top-level JSON array of strings; the parser must 
reject every shape
+    // with `None` so callers can surface a structured INTERNAL_ERROR with 
consistent wording.
+    assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("not-json").isEmpty)
+    assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("").isEmpty)
+    assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("\"id\"").isEmpty)        
// bare string
+    assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("null").isEmpty)
+    assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("{\"id\": 1}").isEmpty)   
// object
+    assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("[1, 2, 3]").isEmpty)     
// numbers
+    assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("[\"id\", 1]").isEmpty)   
// mixed types
+    assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("[\"id\", null]").isEmpty)
+    assert(AutoCdcAuxiliaryTable.parseKeyColumnNames("[[\"id\"]]").isEmpty)    
// nested array
+  }
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
index 5e2286a4fd56..5ebdb4b4c86d 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
@@ -53,10 +53,10 @@ trait AutoCdcGraphExecutionTestMixin extends 
BeforeAndAfterEach {
       s"spark.sql.catalog.$catalog",
       classOf[SharedTablesInMemoryRowLevelOperationTableCatalog].getName
     )
-    // Disable per-flow retries so failure-path tests (e.g. INCOMPATIBLE_DATA) 
surface the
-    // AnalysisException after the first attempt instead of going through the 
default 2 retries,
-    // which would otherwise emit duplicate FAILED events and inflate test 
runtime without
-    // changing the asserted outcome.
+    // Disable per-flow retries so failure-path tests (e.g. KEY_SCHEMA_DRIFT, 
INCOMPATIBLE_DATA)
+    // surface the AnalysisException after the first attempt instead of going 
through the default
+    // 2 retries, which would otherwise emit duplicate FAILED events and 
inflate test runtime
+    // without changing the asserted outcome.
     spark.conf.set(SQLConf.PIPELINES_MAX_FLOW_RETRY_ATTEMPTS.key, "0")
     spark.sql(s"CREATE NAMESPACE IF NOT EXISTS $catalog.$namespace")
   }
@@ -98,7 +98,7 @@ trait AutoCdcGraphExecutionTestMixin extends 
BeforeAndAfterEach {
 
   /**
    * Walk every [[Throwable]] reachable from `failure` via 
[[Throwable#getSuppressed]] and
-   * [[Throwable#getCause]], searching for the first [[SparkThrowable]] whose
+   * [[Throwable#getCause]] for the first [[SparkThrowable]] whose
    * [[SparkThrowable#getCondition]] equals `condition`, then run 
[[checkError]] against that
    * exception with all of its other arguments propagated through.
    */
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
index 50ff60556a73..5a9f6cb6710b 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
@@ -158,6 +158,7 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
     // The auxiliary table only contains keys and the metadata column, hence 
"name" should not be
     // included.
     assert(auxSchema.fieldNames.toSeq == Seq("id", 
Scd1BatchProcessor.cdcMetadataColName))
+    assert(getAuxTableKeyColumnNames(target = "target") == Seq("id"))
   }
 
   test("the auxiliary table preserves the user's declared key order, 
independent of the " +
@@ -169,8 +170,9 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
     // _cdc_metadata) -- same ordering as the source. The user, however, 
declares
     // `keys = Seq("region", "id")` -- the OPPOSITE order from how those 
columns appear in
     // both the source DF and the target. The auxiliary table should honor the 
user's
-    // declared key order, not either physical column ordering, so subsequent 
runs
-    // positionally compare keys against the same recorded layout.
+    // declared key order, both in the persisted aux schema layout and in the
+    // [[AutoCdcAuxiliaryTable.keyColumnNamesProperty]] property value, so 
subsequent runs
+    // compare keys against the same recorded layout.
     spark.sql(
       s"CREATE TABLE $catalog.$namespace.target " +
       s"(value STRING, id INT NOT NULL, region STRING NOT NULL, " +
@@ -194,6 +196,7 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
     val auxSchema = spark.table(auxTableNameFor("target")).schema
     assert(auxSchema.fieldNames.toSeq ==
       Seq("region", "id", Scd1BatchProcessor.cdcMetadataColName))
+    assert(getAuxTableKeyColumnNames(target = "target") == Seq("region", "id"))
   }
 
   test("if the AutoCDC auxiliary table is dropped between runs, it is 
transparently " +
@@ -238,4 +241,87 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
     )
   }
 
+  test("auxiliary key-column-names property survives identifiers containing 
special " +
+    "characters that exercise both JSON and SQL string-literal escaping") {
+    val session = spark
+    import session.implicits._
+
+    // This test exercises the full identifier-text persistence path with 
composite keys whose
+    // names collectively cover every escape class:
+    //   - `it's`              -- single quote: not escaped by JSON; the 
writer must double it
+    //                            to `''` to keep the SQL TBLPROPERTIES 
literal well-formed.
+    //   - `name with spaces`  -- whitespace identifier: backtick-quoted in 
DDL, no escaping
+    //                            needed in the JSON or the property value.
+    //   - `a"b`               -- literal double quote: JSON escapes as `\"`.
+    //   - `c\d`               -- literal backslash: JSON escapes as `\\`.
+    // If any layer drops, splits, or misescapes a name, the post-run lookup 
of the
+    // [[AutoCdcAuxiliaryTable.keyColumnNamesProperty]] property either fails 
to read or
+    // returns a value that is no longer a parseable JSON array of strings.
+    val keyNames = Seq("it's", "name with spaces", "a\"b", "c\\d")
+
+    // SQL DDL identifier rendering: backticks delimit each identifier; an 
embedded backtick
+    // would have to be escaped by doubling, but none of these names contain 
one.
+    val targetTableDdl = keyNames
+      .map(name => s"`$name` STRING NOT NULL")
+      .mkString(", ") + s", version BIGINT NOT NULL, $cdcMetadataDdl"
+    spark.sql(s"CREATE TABLE $catalog.$namespace.target ($targetTableDdl)")
+
+    // The AutoCDC API runs every key through `UnqualifiedColumnName.apply`, 
which calls
+    // `CatalystSqlParser.parseMultipartIdentifier`. To get a single-part 
identifier whose
+    // text includes special characters, the API caller has to backtick-quote 
at the boundary;
+    // we mirror that here by wrapping each name in backticks (and doubling 
any embedded
+    // backtick -- not needed for these names but kept for parity with how a 
user would call
+    // the API).
+    val backtickQuotedKeys = keyNames.map(name => s"`${name.replace("`", 
"``")}`")
+
+    // Single MemoryStream reused across both runs so the streaming checkpoint 
can resume.
+    val stream = MemoryStream[(String, String, String, String, Long)]
+    def buildCtx(): TestGraphRegistrationContext = new 
TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "auto_cdc_flow",
+        target = "target",
+        query = dfFlowFunc(
+          stream.toDF().toDF((keyNames :+ "version"): _*)
+        ),
+        keys = backtickQuotedKeys,
+        sequencing = functions.col("version")
+      ))
+    }
+
+    // Run #1: a single insert with arbitrary non-empty key values.
+    stream.addData(("v1", "v2", "v3", "v4", 1L))
+    runPipeline(buildCtx())
+
+    // The persisted property must round-trip every name byte-for-byte.
+    assert(getAuxTableKeyColumnNames(target = "target") == keyNames)
+
+    // Run #2: same keys, a higher sequence -- drift validation reads the 
property back, parses
+    // the JSON, and looks up each recorded name in the aux schema. If any 
layer mangled the
+    // identifier text (lost an escape, dropped a `'`, split on a `.`, ...), 
validation would
+    // either throw KEY_SCHEMA_DRIFT (name lookup miss) or INTERNAL_ERROR 
(recorded name absent
+    // from aux schema). Reaching the second run successfully proves the 
round-trip works.
+    stream.addData(("v1", "v2", "v3", "v4", 2L))
+    runPipeline(buildCtx())
+
+    // The persisted property is immutable across non-full-refresh runs, so it 
must still be
+    // intact after run #2.
+    assert(getAuxTableKeyColumnNames(target = "target") == keyNames)
+  }
+
+  private def getAuxTableKeyColumnNames(target: String): Seq[String] = {
+    val auxName = auxTableNameFor(target)
+    val rows = spark.sql(s"SHOW TBLPROPERTIES $auxName").collect()
+    val prop = rows
+      .find(_.getString(0) == AutoCdcAuxiliaryTable.keyColumnNamesProperty)
+      .getOrElse(fail(
+        s"auxiliary table $auxName is missing the " +
+        s"${AutoCdcAuxiliaryTable.keyColumnNamesProperty} property; got: 
${rows.toSeq}"
+      ))
+    AutoCdcAuxiliaryTable.parseKeyColumnNames(prop.getString(1))
+      .getOrElse(fail(
+        s"auxiliary table $auxName has a malformed " +
+        s"${AutoCdcAuxiliaryTable.keyColumnNamesProperty} property: 
'${prop.getString(1)}'"
+      ))
+  }
 }
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
new file mode 100644
index 000000000000..066d8afd5342
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1KeyDriftSuite.scala
@@ -0,0 +1,566 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
+import org.apache.spark.sql.functions
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * End-to-end tests covering AutoCDC SCD1 key-drift validation: the AutoCDC 
flow's declared
+ * keys are validated against the auxiliary table's recorded keys at flow 
execution-init
+ * time. A change in keys across runs without a full refresh corrupts the 
merge semantics
+ * (rows mis-routed between insert/update); validation detects this and fails 
fast with a
+ * structured [[AUTOCDC_INVALID_STATE]] error.
+ *
+ * Each test seeds the auxiliary table by running a first pipeline with one 
set of keys, then
+ * runs a second pipeline with a different shape (new keys, dropped keys, 
swapped keys, drifted
+ * dataType, or with a tampered auxiliary table) and asserts on the structured 
failure.
+ */
+class AutoCdcScd1KeyDriftSuite
+    extends ExecutionTest
+    with SharedSparkSession
+    with AutoCdcGraphExecutionTestMixin {
+
+  test("a pipeline execution that adds a key column to an existing AutoCDC 
flow triggers " +
+    "KEY_SCHEMA_DRIFT") {
+    val session = spark
+    import session.implicits._
+
+    // Target table carries both candidate key columns up-front so only the 
AutoCDC `keys`
+    // declaration differs between the two pipelines.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, region STRING NOT NULL, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    // Pipeline #1 declares one key (`id`). Aux table is created with schema 
(id, _cdc_metadata).
+    val stream1 = MemoryStream[(Int, String, Long)]
+    stream1.addData((1, "us", 1L))
+    val ctx1 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_v1",
+        target = "target",
+        query = dfFlowFunc(stream1.toDF().toDF("id", "region", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx1)
+
+    // Pipeline #2 declares two keys (`region` + `id`) - arity drift.
+    val stream2 = MemoryStream[(Int, String, Long)]
+    stream2.addData((1, "us", 2L))
+    val ctx2 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_v2",
+        target = "target",
+        query = dfFlowFunc(stream2.toDF().toDF("id", "region", "version")),
+        keys = Seq("region", "id"),
+        sequencing = functions.col("version")
+      ))
+    }
+
+    val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+    checkErrorInPipelineFailure(
+      failure = ex,
+      condition = "AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT",
+      sqlState = Some("42000"),
+      parameters = Map(
+        "flowName" ->
+          fullyQualifiedIdentifier("flow_v2", Some(catalog), 
Some(namespace)).unquotedString,
+        "auxTableName" -> auxTableNameFor("target"),
+        // `region` is nullable here because Scala `String` is a reference 
type and the
+        // [[MemoryStream]] tuple encoder treats reference types as nullable. 
Only Scala
+        // primitives (`Int`, `Long`, ...) yield `NOT NULL` columns.
+        "expectedKeySchema" -> "region STRING,id INT NOT NULL",
+        "recordedKeySchema" -> "id INT NOT NULL"
+      )
+    )
+  }
+
+  test("a pipeline execution that drops a key column from an existing AutoCDC 
flow triggers " +
+    "KEY_SCHEMA_DRIFT") {
+    val session = spark
+    import session.implicits._
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(region STRING NOT NULL, id INT NOT NULL, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    // Pipeline #1 declares two keys [region, id]. Without strict-equality, 
the dropped `region`
+    // would slip through with `id` silently matching at position 0 of the 
recorded schema.
+    val stream1 = MemoryStream[(String, Int, Long)]
+    stream1.addData(("us", 1, 1L))
+    val ctx1 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_v1",
+        target = "target",
+        query = dfFlowFunc(stream1.toDF().toDF("region", "id", "version")),
+        keys = Seq("region", "id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx1)
+
+    // Pipeline #2 declares only [id] - arity drift.
+    val stream2 = MemoryStream[(String, Int, Long)]
+    stream2.addData(("us", 1, 2L))
+    val ctx2 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_v2",
+        target = "target",
+        query = dfFlowFunc(stream2.toDF().toDF("region", "id", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+
+    val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+    checkErrorInPipelineFailure(
+      failure = ex,
+      condition = "AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT",
+      sqlState = Some("42000"),
+      parameters = Map(
+        "flowName" ->
+          fullyQualifiedIdentifier("flow_v2", Some(catalog), 
Some(namespace)).unquotedString,
+        "auxTableName" -> auxTableNameFor("target"),
+        "expectedKeySchema" -> "id INT NOT NULL",
+        // `region` is nullable here because Scala `String` is a reference 
type; see the
+        // analogous comment in the "adds a key column" test above.
+        "recordedKeySchema" -> "region STRING,id INT NOT NULL"
+      )
+    )
+  }
+
+  test("a pipeline execution that swaps a key in an existing AutoCDC flow for 
a different name " +
+    "(same arity) triggers KEY_SCHEMA_DRIFT") {
+    val session = spark
+    import session.implicits._
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, region STRING NOT NULL, country STRING NOT NULL, " +
+      s"version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+
+    // Pipeline #1 declares [id, region].
+    val stream1 = MemoryStream[(Int, String, String, Long)]
+    stream1.addData((1, "us", "USA", 1L))
+    val ctx1 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_v1",
+        target = "target",
+        query = dfFlowFunc(stream1.toDF().toDF("id", "region", "country", 
"version")),
+        keys = Seq("id", "region"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx1)
+
+    // Pipeline #2 declares [id, country] - same arity, different key set. An 
arity-only check
+    // would silently match `id` at position 0 and the swapped 
`region`/`country` would slip
+    // through; the by-name set comparison must catch it.
+    val stream2 = MemoryStream[(Int, String, String, Long)]
+    stream2.addData((1, "us", "USA", 2L))
+    val ctx2 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_v2",
+        target = "target",
+        query = dfFlowFunc(stream2.toDF().toDF("id", "region", "country", 
"version")),
+        keys = Seq("id", "country"),
+        sequencing = functions.col("version")
+      ))
+    }
+
+    val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+    checkErrorInPipelineFailure(
+      failure = ex,
+      condition = "AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT",
+      sqlState = Some("42000"),
+      parameters = Map(
+        "flowName" ->
+          fullyQualifiedIdentifier("flow_v2", Some(catalog), 
Some(namespace)).unquotedString,
+        "auxTableName" -> auxTableNameFor("target"),
+        // `country` and `region` are nullable here because Scala `String` is 
a reference type;
+        // see the analogous comment in the "adds a key column" test above.
+        "expectedKeySchema" -> "id INT NOT NULL,country STRING",
+        "recordedKeySchema" -> "id INT NOT NULL,region STRING"
+      )
+    )
+  }
+
+  test("a pipeline whose recorded aux key dataType differs from the flow's 
source dataType " +
+    "triggers KEY_SCHEMA_DRIFT") {
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+    spark.sql(
+      s"""CREATE TABLE ${auxTableNameFor("target")} (id BIGINT NOT NULL, 
$cdcMetadataDdl) """ +
+      s"""TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = 
'["id"]')"""
+    )
+
+    val session = spark
+    import session.implicits._
+    val stream = MemoryStream[(Int, Long)]
+    stream.addData((1, 1L))
+    val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), 
Seq("id"))
+
+    val ex = intercept[RuntimeException] { runPipeline(ctx) }
+    checkErrorInPipelineFailure(
+      failure = ex,
+      condition = "AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT",
+      sqlState = Some("42000"),
+      parameters = Map(
+        "flowName" ->
+          fullyQualifiedIdentifier("flow", Some(catalog), 
Some(namespace)).unquotedString,
+        "auxTableName" -> auxTableNameFor("target"),
+        "expectedKeySchema" -> "id INT NOT NULL",
+        "recordedKeySchema" -> "id BIGINT NOT NULL"
+      )
+    )
+  }
+
+  test("a composite key reorder ([a,b] -> [b,a]) does NOT trigger drift 
validation") {
+    val session = spark
+    import session.implicits._
+
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(a INT NOT NULL, b STRING NOT NULL, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    // Pipeline #1 declares keys [a, b] (in that order). Drift validation is 
order-independent:
+    // the recorded ordering is purely cosmetic for human-readable error 
messages and must not
+    // gate semantic equivalence, since the merge semantics depend only on the 
*set* of key
+    // columns and their dataTypes.
+    val stream1 = MemoryStream[(Int, String, Long)]
+    stream1.addData((1, "x", 1L))
+    val ctx1 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_v1",
+        target = "target",
+        query = dfFlowFunc(stream1.toDF().toDF("a", "b", "version")),
+        keys = Seq("a", "b"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx1)
+
+    // Pipeline #2 declares the same key set in the reversed order [b, a]. 
Must NOT throw.
+    val stream2 = MemoryStream[(Int, String, Long)]
+    stream2.addData((2, "y", 1L))
+    val ctx2 = new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_v2",
+        target = "target",
+        query = dfFlowFunc(stream2.toDF().toDF("a", "b", "version")),
+        keys = Seq("b", "a"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx2)
+  }
+
+  test("a pipeline execution that changes a key column's nullability or 
metadata in an " +
+    "existing AutoCDC flow does NOT trigger drift") {
+    val session = spark
+    import session.implicits._
+
+    // Drift validation compares (name, dataType) pairs as a set. Nullability 
and column
+    // metadata are part of [[StructField]] but not part of [[DataType]], so 
they do not gate
+    // semantic equivalence: only the wire-format data type matters for merge 
correctness.
+    // Target's `id` is nullable so the second pipeline's nullable-`id` source 
is accepted.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+
+    // Pipeline #1: source carries `id INT NOT NULL` (Scala primitive `Int`), 
no metadata.
+    val stream1 = MemoryStream[(Int, Long)]
+    stream1.addData((1, 1L))
+    runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("id", "version"), 
Seq("id")))
+
+    // Pipeline #2: source carries `id INT` (nullable, via `Option[Int]`) AND 
attaches
+    // non-empty column metadata. Same name and `dataType` as the recorded 
key, but every
+    // [[StructField]] aspect outside `dataType` differs.
+    val stream2 = MemoryStream[(Option[Int], Long)]
+    stream2.addData((Some(2), 2L))
+    val baseDf = stream2.toDF().toDF("id", "version")
+    val md = new org.apache.spark.sql.types.MetadataBuilder()
+      .putString("description", "primary key")
+      .build()
+    val sourceDfWithMetadata = baseDf.select(baseDf("id").as("id", md), 
baseDf("version"))
+    runPipeline(buildPipeline("flow_v2", sourceDfWithMetadata, Seq("id")))
+  }
+
+  test("a pipeline execution that wraps an existing AutoCDC flow's key in 
backticks does NOT " +
+    "trigger drift") {
+    val session = spark
+    import session.implicits._
+
+    // Backticks are a SQL-parse syntactic device, not part of the identifier 
itself. A user
+    // adding or removing backticks around the same logical column must NOT be 
detected as drift.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+
+    val stream1 = MemoryStream[(Int, Long)]
+    stream1.addData((1, 1L))
+    runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("id", "version"), 
Seq("id")))
+
+    val stream2 = MemoryStream[(Int, Long)]
+    stream2.addData((2, 1L))
+    runPipeline(buildPipeline("flow_v2", stream2.toDF().toDF("id", "version"), 
Seq("`id`")))
+  }
+
+  test("a pipeline execution that drops backticks around an existing AutoCDC 
flow's " +
+    "previously-backtick-quoted key does NOT trigger drift") {
+    val session = spark
+    import session.implicits._
+
+    // The reverse direction of the previous test: drift validation must be 
backtick-invariant
+    // on both the WRITE side (recorded property strips backticks when 
serializing the key
+    // names in pipeline #1) and the READ side (resolver-aware lookup strips 
backticks when
+    // pipeline #2's expected keys are matched against the recorded set).
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+
+    val stream1 = MemoryStream[(Int, Long)]
+    stream1.addData((1, 1L))
+    runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("id", "version"), 
Seq("`id`")))
+
+    val stream2 = MemoryStream[(Int, Long)]
+    stream2.addData((2, 1L))
+    runPipeline(buildPipeline("flow_v2", stream2.toDF().toDF("id", "version"), 
Seq("id")))
+  }
+
+  test("under spark.sql.caseSensitive = true, an AutoCDC flow whose key 
differs only in case " +
+    "from the recorded key triggers KEY_SCHEMA_DRIFT") {
+    val session = spark
+    import session.implicits._
+
+    // validateNoAutoCdcKeyDrift uses spark.sessionState.conf.resolver, so its 
behavior on
+    // `Id` vs `id` flips with the session conf. Pin the case-sensitive 
direction: pipeline #1
+    // seeds the aux table under the default resolver with recorded key 
`["id"]`, then
+    // pipeline #2 runs under the case-sensitive resolver with key `["Id"]`. 
Because `Id` and
+    // `id` are distinct identifiers under that resolver, drift validation 
must fail.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+
+    val stream1 = MemoryStream[(Int, Long)]
+    stream1.addData((1, 1L))
+    runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("id", "version"), 
Seq("id")))
+
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+      // Source DF column is `Id` (capital) so the AutoCDC flow's own 
key-presence check
+      // (`requireKeysPresentInSelectedSchema`) succeeds under case-sensitive 
analysis.
+      // Drift validation is then the only remaining failure mode and it must 
fire.
+      val stream2 = MemoryStream[(Int, Long)]
+      stream2.addData((1, 2L))
+      val ctx2 = buildPipeline("flow_v2", stream2.toDF().toDF("Id", 
"version"), Seq("Id"))
+
+      val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+      checkErrorInPipelineFailure(
+        failure = ex,
+        condition = "AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT",
+        sqlState = Some("42000"),
+        parameters = Map(
+          "flowName" ->
+            fullyQualifiedIdentifier("flow_v2", Some(catalog), 
Some(namespace)).unquotedString,
+          "auxTableName" -> auxTableNameFor("target"),
+          "expectedKeySchema" -> "Id INT NOT NULL",
+          "recordedKeySchema" -> "id INT NOT NULL"
+        )
+      )
+    }
+  }
+
+  test("under the default (case-insensitive) resolver, an AutoCDC flow whose 
key differs only " +
+    "in case from the recorded key does NOT trigger drift") {
+    val session = spark
+    import session.implicits._
+
+    // Pairs with the case-sensitive test above: same recorded key, but under 
the default
+    // resolver the two identifiers are equivalent so drift validation must 
accept pipeline
+    // #2. This pins the negative direction so a regression that accidentally 
hard-codes a
+    // case-sensitive resolver in the validator is caught.
+    //
+    // Note that only the *key declaration* (`Seq("Id")`) has different casing 
here -- the
+    // source DF column name still matches the target's `id` exactly. 
Differing the source DF
+    // column casing as well would not exercise drift: 
[[SchemaMergingUtils.mergeSchemas]] is
+    // case-sensitive on column names and would add `Id` as a new column to 
the target,
+    // producing AMBIGUOUS_REFERENCE during the streaming write rather than 
letting drift
+    // validation make the call.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+
+    val stream1 = MemoryStream[(Int, Long)]
+    stream1.addData((1, 1L))
+    runPipeline(buildPipeline("flow_v1", stream1.toDF().toDF("id", "version"), 
Seq("id")))
+
+    val stream2 = MemoryStream[(Int, Long)]
+    stream2.addData((1, 2L))
+    runPipeline(buildPipeline("flow_v2", stream2.toDF().toDF("id", "version"), 
Seq("Id")))
+  }
+
+  test("a pipeline whose aux table is missing the keyColumnNames property 
fails with " +
+    "AUXILIARY_TABLE_PROPERTY_MISSING") {
+    // Pre-create the aux table directly without the 
[[keyColumnNamesProperty]] to simulate
+    // corrupt metadata (e.g. user ran `ALTER TABLE ... UNSET TBLPROPERTIES`). 
Validation must
+    // surface a structured AUTOCDC_INVALID_STATE error rather than silently 
mis-validating keys.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+    spark.sql(
+      s"CREATE TABLE ${auxTableNameFor("target")} (id INT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    val session = spark
+    import session.implicits._
+    val stream = MemoryStream[(Int, Long)]
+    stream.addData((1, 1L))
+    val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), 
Seq("id"))
+
+    val ex = intercept[RuntimeException] { runPipeline(ctx) }
+    checkErrorInPipelineFailure(
+      failure = ex,
+      condition = "AUTOCDC_INVALID_STATE.AUXILIARY_TABLE_PROPERTY_MISSING",
+      sqlState = Some("42000"),
+      parameters = Map(
+        "flowName" ->
+          fullyQualifiedIdentifier("flow", Some(catalog), 
Some(namespace)).unquotedString,
+        "auxTableName" -> auxTableNameFor("target"),
+        "propertyName" -> AutoCdcAuxiliaryTable.keyColumnNamesProperty
+      )
+    )
+  }
+
+  test("a pipeline whose aux table has a malformed keyColumnNames property 
fails with " +
+    "AUXILIARY_TABLE_PROPERTY_MALFORMED") {
+    // Pre-create the aux table directly with a non-JSON-array property value 
to simulate
+    // corrupt metadata. Validation must surface a structured 
AUTOCDC_INVALID_STATE error
+    // rather than letting a parse exception leak.
+    val malformedKeysArray = "not-a-json-array"
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+    spark.sql(
+      s"CREATE TABLE ${auxTableNameFor("target")} (id INT NOT NULL, 
$cdcMetadataDdl) " +
+      s"TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = 
'$malformedKeysArray')"
+    )
+
+    val session = spark
+    import session.implicits._
+    val stream = MemoryStream[(Int, Long)]
+    stream.addData((1, 1L))
+    val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), 
Seq("id"))
+
+    val ex = intercept[RuntimeException] { runPipeline(ctx) }
+    checkErrorInPipelineFailure(
+      failure = ex,
+      condition = "AUTOCDC_INVALID_STATE.AUXILIARY_TABLE_PROPERTY_MALFORMED",
+      sqlState = Some("42000"),
+      parameters = Map(
+        "flowName" ->
+          fullyQualifiedIdentifier("flow", Some(catalog), 
Some(namespace)).unquotedString,
+        "auxTableName" -> auxTableNameFor("target"),
+        "propertyName" -> AutoCdcAuxiliaryTable.keyColumnNamesProperty,
+        "rawValue" -> malformedKeysArray
+      )
+    )
+  }
+
+  test("a pipeline whose aux table records a key absent from its schema fails 
with " +
+    "AUXILIARY_TABLE_KEY_COLUMN_MISSING") {
+    // Pre-create the aux table directly with the [[keyColumnNamesProperty]] 
pointing at a
+    // column that does not exist in the aux schema. This is either a 
write-path implementation
+    // bug or external user tampering (e.g. dropping the key column); 
validation must surface a
+    // structured AUTOCDC_INVALID_STATE error rather than KEY_SCHEMA_DRIFT, 
because the drift
+    // validator cannot run without resolving every recorded key first.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.target " +
+      s"(id INT NOT NULL, version BIGINT NOT NULL, $cdcMetadataDdl)"
+    )
+    spark.sql(
+      s"""CREATE TABLE ${auxTableNameFor("target")} (id INT NOT NULL, 
$cdcMetadataDdl) """ +
+      s"""TBLPROPERTIES ('${AutoCdcAuxiliaryTable.keyColumnNamesProperty}' = 
'["region"]')"""
+    )
+
+    val session = spark
+    import session.implicits._
+    val stream = MemoryStream[(Int, Long)]
+    stream.addData((1, 1L))
+    val ctx = buildPipeline("flow", stream.toDF().toDF("id", "version"), 
Seq("id"))
+
+    val ex = intercept[RuntimeException] { runPipeline(ctx) }
+    checkErrorInPipelineFailure(
+      failure = ex,
+      condition = "AUTOCDC_INVALID_STATE.AUXILIARY_TABLE_KEY_COLUMN_MISSING",
+      sqlState = Some("42000"),
+      parameters = Map(
+        "flowName" ->
+          fullyQualifiedIdentifier("flow", Some(catalog), 
Some(namespace)).unquotedString,
+        "auxTableName" -> auxTableNameFor("target"),
+        "keyColumnName" -> "region",
+        "propertyName" -> AutoCdcAuxiliaryTable.keyColumnNamesProperty
+      )
+    )
+  }
+
+  /**
+   * Build a single-flow pipeline targeting `cat.ns1.target` with the given 
source DF and key
+   * column list.
+   */
+  private def buildPipeline(
+      flowName: String,
+      sourceDf: org.apache.spark.sql.classic.DataFrame,
+      keys: Seq[String]): TestGraphRegistrationContext = {
+    new TestGraphRegistrationContext(spark) {
+      registerTable("target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = flowName,
+        target = "target",
+        query = dfFlowFunc(sourceDf),
+        keys = keys,
+        sequencing = functions.col("version")
+      ))
+    }
+  }
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
index 32f34923c480..0d3f6e954df3 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1MultiPipelineSuite.scala
@@ -293,4 +293,65 @@ class AutoCdcScd1MultiPipelineSuite
     )
   }
 
+  test("a second pipeline targeting an existing AutoCDC table with different 
keys " +
+    "fails with KEY_SCHEMA_DRIFT") {
+    val session = spark
+    import session.implicits._
+
+    // Target table with both candidate keys present so the second pipeline 
would otherwise
+    // be schema-compatible with the first; only the AutoCDC `keys` differ 
between flows.
+    spark.sql(
+      s"CREATE TABLE $catalog.$namespace.shared_target " +
+      s"(id INT NOT NULL, name STRING NOT NULL, version BIGINT NOT NULL, 
$cdcMetadataDdl)"
+    )
+
+    // Pipeline #1: AutoCDC flow keyed on `id`. Materializes the auxiliary 
table with schema
+    // (id, _cdc_metadata).
+    val stream1 = MemoryStream[(Int, String, Long)]
+    stream1.addData((1, "alice", 1L))
+    val ctx1 = new TestGraphRegistrationContext(spark) {
+      registerTable("shared_target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_v1",
+        target = "shared_target",
+        query = dfFlowFunc(stream1.toDF().toDF("id", "name", "version")),
+        keys = Seq("id"),
+        sequencing = functions.col("version")
+      ))
+    }
+    runPipeline(ctx1)
+
+    // Pipeline #2: completely separate graph, but targets the same physical 
`shared_target`
+    // table with `keys = Seq("name")`.
+    val stream2 = MemoryStream[(Int, String, Long)]
+    stream2.addData((2, "alice", 1L))
+    val ctx2 = new TestGraphRegistrationContext(spark) {
+      registerTable("shared_target", catalog = Some(catalog), database = 
Some(namespace))
+      registerFlow(autoCdcFlow(
+        name = "flow_v2",
+        target = "shared_target",
+        query = dfFlowFunc(stream2.toDF().toDF("id", "name", "version")),
+        keys = Seq("name"),
+        sequencing = functions.col("version")
+      ))
+    }
+
+    val ex = intercept[RuntimeException] { runPipeline(ctx2) }
+    checkErrorInPipelineFailure(
+      failure = ex,
+      condition = "AUTOCDC_INVALID_STATE.KEY_SCHEMA_DRIFT",
+      sqlState = Some("42000"),
+      parameters = Map(
+        "flowName" ->
+          fullyQualifiedIdentifier("flow_v2", Some(catalog), 
Some(namespace)).unquotedString,
+        "auxTableName" -> auxTableNameFor("shared_target"),
+        // Pipeline #2's AutoCDC key resolves from the source DF, where 
`MemoryStream[(Int, String,
+        // Long)]` produces a nullable StringType for `name`.
+        "expectedKeySchema" -> "name STRING",
+        // Pipeline #1 persisted the aux table from a source DF whose `id` was 
a non-null Scala
+        // primitive (`Int`), so the recorded key carries `NOT NULL`.
+        "recordedKeySchema" -> "id INT NOT NULL"
+      )
+    )
+  }
 }
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
index 4c20b21ad57a..2424dbdc4e05 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1SchemaEvolutionSuite.scala
@@ -31,11 +31,10 @@ import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistratio
 import org.apache.spark.sql.test.SharedSparkSession
 
 /**
- * Tests covering AutoCDC's interaction with schema evolution across pipeline 
runs. The
- * suite documents the supported additive cases (new top-level columns, new 
nested fields
- * in array-of-struct, broadening / narrowing column selection) and the cases 
that fail
- * loudly today (subtractive nested evolution, type-incompatible changes, 
case-only
- * renames).
+ * Tests covering AutoCDC's interaction with non-key schema evolution across 
pipeline runs. The
+ * suite documents the supported additive cases (new top-level columns, new 
nested fields in
+ * array-of-struct, broadening / narrowing column selection) and the cases 
that fail loudly
+ * today (subtractive nested evolution, type-incompatible changes, case-only 
renames).
  *
  * These behaviors are largely inherited from the lower layers 
(`SchemaMergingUtils` for
  * schema merge, the v2 writer's column-resolution layer for nested-field 
handling) rather


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to