This is an automated email from the ASF dual-hosted git repository.

jose-torres pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new ecf21a215ce3 [SPARK-57134][SDP] Implement SCD2 Batch Processor; 
Preprocess Microbatch
ecf21a215ce3 is described below

commit ecf21a215ce34adc379853921d2370724389c354
Author: AnishMahto <[email protected]>
AuthorDate: Thu Jun 4 14:55:45 2026 -0700

    [SPARK-57134][SDP] Implement SCD2 Batch Processor; Preprocess Microbatch
    
    ### What changes were proposed in this pull request?
    **Preamble:**
    
    The SCD type 2 flow is a foreachBatch streaming query on an input 
change-data-feed, and is responsible for reconciling the incoming change data 
onto some target table that follows SCD2 replication semantics.
    
    SCD2 flows also maintain an "auxiliary" table to keep track of 
early-arriving/out-of-order received events state. Each microbatch will need to 
reconcile against this auxiliary table as well, and update the auxiliary 
table's state appropriately for future microbatches.
    
    **Preprocess Microbatch**
    
    For SCD2, preprocessing the microbatch is all about getting it in the right 
shape, aligned with the shape of the target table the microbatch will be merged 
into + the shape that SCD2 itself as a standard demands.
    
    That is:
    
    The microbatch must have a start-at and end-at columns projected as per the 
SCD2 standard, to indicate that a historical/alive record was active between 
those sequence stamps
    The microbatch will have the operational CDC metadata column projected, 
which is needed to reconcile late arriving events/bookkeeping
    As per the Spark AutoCDC API, the microbatch should project down to just 
the user-specified column selection
    Implement the part of the core SCD2 microbatch processor that does this 
microbatch preprocessing.
    
    ### Why are the changes needed?
    To support AutoCDC SCD2 transformations, as per the approved SPIP: 
https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
    
    ### Does this PR introduce _any_ user-facing change?
    No. New feature.
    
    ### How was this patch tested?
    `Scd2BatchProcessorSuite`
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Co-written with Claude Opus 4.7
    
    Closes #56208 from AnishMahto/SPARK-57134-SCD2-preprocess-microbatch.
    
    Authored-by: AnishMahto <[email protected]>
    Signed-off-by: Jose Torres <[email protected]>
    (cherry picked from commit e4462556fafcc3aca42ec06896ac1ba75b52766a)
    Signed-off-by: Jose Torres <[email protected]>
---
 .../pipelines/autocdc/AutoCdcReservedNames.scala   |  11 +
 .../sql/pipelines/autocdc/Scd1BatchProcessor.scala |  13 +-
 .../sql/pipelines/autocdc/Scd2BatchProcessor.scala | 335 ++++++++++++++
 .../apache/spark/sql/pipelines/graph/Flow.scala    |   4 +-
 .../spark/sql/pipelines/graph/FlowExecution.scala  |   5 +-
 .../autocdc/AutoCdcCatalogExecutionTestBase.scala  |   4 +-
 .../sql/pipelines/autocdc/AutoCdcFlowSuite.scala   |  18 +-
 .../autocdc/Scd1BatchProcessorMergeSuite.scala     |  10 +-
 .../autocdc/Scd1BatchProcessorSuite.scala          |  26 +-
 .../autocdc/Scd1ForeachBatchHandlerSuite.scala     |  12 +-
 .../autocdc/Scd2BatchProcessorSuite.scala          | 515 +++++++++++++++++++++
 .../graph/AutoCdcGraphExecutionTestMixin.scala     |   3 +-
 .../AutoCdcScd1AuxiliaryTableDurabilitySuite.scala |   6 +-
 .../AutoCdcScd1TargetTableDurabilitySuite.scala    |   6 +-
 14 files changed, 915 insertions(+), 53 deletions(-)

diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala
index 2b0f8e293e76..8284441e9e2b 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala
@@ -29,4 +29,15 @@ private[pipelines] object AutoCdcReservedNames {
 
   /** Common reserved-name prefix shared by AutoCDC internal columns and 
internal tables. */
   val prefix: String = "__spark_autocdc_"
+
+  /**
+   * Reserved name of the operational metadata column AutoCDC that is 
projected on every AutoCDC
+   * microbatch, auxiliary table, and target table.
+   *
+   * Shared across all SCD strategies and across the flow resolution, 
batch-processor, and
+   * streaming-write layers.
+   *
+   * Note that the schema of the CDC metadata column however can and does 
differ on the SCD-type.
+   */
+  val cdcMetadataColName: String = s"${prefix}metadata"
 }
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index 0656a7eb91b0..35006dc4ee21 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -143,7 +143,7 @@ case class Scd1BatchProcessor(
       F.when(rowDeleteSequence.isNull, 
changeArgs.sequencing).otherwise(F.lit(null))
 
     validatedMicrobatch.withColumn(
-      Scd1BatchProcessor.cdcMetadataColName,
+      AutoCdcReservedNames.cdcMetadataColName,
       Scd1BatchProcessor.constructCdcMetadataCol(
         deleteSequence = rowDeleteSequence,
         upsertSequence = rowUpsertSequence,
@@ -175,7 +175,7 @@ case class Scd1BatchProcessor(
       schema = microbatchWithCdcMetadataDf.schema,
       columnSelection = Some(
         ColumnSelection.ExcludeColumns(
-          Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName))
+          Seq(UnqualifiedColumnName(AutoCdcReservedNames.cdcMetadataColName))
         )
       ),
       caseSensitive = caseSensitiveColumnComparison
@@ -197,7 +197,7 @@ case class Scd1BatchProcessor(
         // select. Identifiers could have special characters such as '.'.
         F.col(QuotingUtils.quoteIdentifier(colName))
       }) :+ F.col(
-        Scd1BatchProcessor.cdcMetadataColName
+        AutoCdcReservedNames.cdcMetadataColName
       )
 
     microbatchWithCdcMetadataDf.select(
@@ -223,7 +223,7 @@ case class Scd1BatchProcessor(
     val aliasedMicrobatchDf = microbatchDf.alias("microbatch")
     val aliasedAuxiliaryTableDf = auxiliaryTableDf.alias("auxiliaryTable")
 
-    val cdcMetadata = Scd1BatchProcessor.cdcMetadataColName
+    val cdcMetadata = AutoCdcReservedNames.cdcMetadataColName
 
     val microbatchCdcMetadata = F.col(s"microbatch.$cdcMetadata")
     val effectiveSeq = F.greatest(
@@ -267,7 +267,7 @@ case class Scd1BatchProcessor(
       auxiliaryTableIdentifier: TableIdentifier
   ): Unit = {
     val auxIdentQuoted = auxiliaryTableIdentifier.quotedString
-    val meta = Scd1BatchProcessor.cdcMetadataColName
+    val meta = AutoCdcReservedNames.cdcMetadataColName
 
     // Project the reconciled microbatch down to just keys + `_cdc_metadata`; 
data columns are
     // irrelevant for the auxiliary table and should not be persisted.
@@ -330,7 +330,7 @@ case class Scd1BatchProcessor(
       reconciledMicrobatchDf: DataFrame,
       targetTableIdentifier: TableIdentifier
   ): Unit = {
-    val meta = Scd1BatchProcessor.cdcMetadataColName
+    val meta = AutoCdcReservedNames.cdcMetadataColName
 
     val destinationTableStr = targetTableIdentifier.quotedString
     // (Re-)alias the reconciled microbatch DF for easy reference for the 
remainder of the merge.
@@ -415,7 +415,6 @@ object Scd1BatchProcessor {
    * enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]] 
construction.
    */
   private[autocdc] val winningRowColName: String = 
s"${AutoCdcReservedNames.prefix}winning_row"
-  private[pipelines] val cdcMetadataColName: String = 
s"${AutoCdcReservedNames.prefix}metadata"
 
   private[pipelines] val cdcDeleteSequenceFieldName: String = "deleteSequence"
   private[pipelines] val cdcUpsertSequenceFieldName: String = "upsertSequence"
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala
new file mode 100644
index 000000000000..60fda36fea07
--- /dev/null
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{functions => F}
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.util.QuotingUtils
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.ArrayImplicits._
+
+/**
+ * Per-microbatch processor for SCD Type 2 AutoCDC flows, complying to the 
specified
+ * [[changeArgs]] configuration.
+ *
+ * @param changeArgs The CDC flow configuration.
+ * @param resolvedSequencingType The post-analysis [[DataType]] of the 
sequencing column, derived
+ *                               from the flow's resolved DataFrame at flow 
setup time.
+ */
+case class Scd2BatchProcessor(
+    changeArgs: ChangeArgs,
+    resolvedSequencingType: DataType) {
+
+  /**
+   * Reconcile a CDC microbatch into the canonical form the auxiliary- and 
target-table merges
+   * consume.
+   *
+   * Step ordering is load-bearing: the row-extension steps reference user 
data columns that
+   * target-column selection is allowed to drop, so selection runs last. 
Unlike SCD1, no per-key
+   * deduplication step is performed here - SCD2 preserves every event as part 
of the row's
+   * history, including byte-identical full-event duplicates.
+   *
+   * Duplicate event elimination (e.g., collapsing two identical events at the 
same sequence),
+   * whether across microbatches or within the same microbatch, is the 
responsibility of
+   * downstream reconciliation - not preprocessing.
+   *
+   * Produces a dataframe that retains every input row 1:1 - no rows added, 
dropped, reordered,
+   * or merged - with the following schema, in column order:
+   *   1. The user columns of `microbatchDf` that survive 
[[ChangeArgs.columnSelection]], in the
+   *      order they appeared in the input.
+   *   2. `__START_AT` column, populated with the sequence value of the row.
+   *   3. `__END_AT` column, populated with the sequence value of the row IFF 
it's a delete event,
+   *      null otherwise.
+   *   4. `__spark_autocdc_metadata` column.
+   */
+  private[autocdc] def preprocessMicrobatch(microbatchDf: DataFrame): 
DataFrame = {
+    microbatchDf
+      .transform(extendMicrobatchRowsWithStartAt)
+      .transform(extendMicrobatchRowsWithEndAt)
+      .transform(extendMicrobatchRowsWithCdcMetadata)
+      .transform(projectTargetColumnsOntoMicrobatch)
+  }
+
+  /**
+   * Stamp each microbatch row with its currently known start-at (i.e 
active-from) using its
+   * sequencing.
+   */
+  private def extendMicrobatchRowsWithStartAt(microbatchDf: DataFrame): 
DataFrame = {
+    microbatchDf.withColumn(
+      colName = Scd2BatchProcessor.startAtColName,
+      col = changeArgs.sequencing.cast(resolvedSequencingType)
+    )
+  }
+
+  /**
+   * Stamp each microbatch delete event row with its end time sequence, as 
they are instantaneous
+   * events.
+   *
+   * Non-deletes leave a null end, as we do not yet know if the row represents 
an active upsert,
+   * or a closed upsert. This will become clear in later reconciliation 
against the aux/target
+   * tables.
+   */
+  private def extendMicrobatchRowsWithEndAt(microbatchDf: DataFrame): 
DataFrame = {
+    microbatchDf.withColumn(
+      colName = Scd2BatchProcessor.endAtColName,
+      col = (
+        changeArgs.deleteCondition match {
+          case Some(deleteCondition) =>
+            F.when(deleteCondition, 
changeArgs.sequencing).otherwise(F.lit(null))
+          case None =>
+            F.lit(null)
+        }
+      ).cast(resolvedSequencingType)
+    )
+  }
+
+  /**
+   * Project the operational CDC metadata column carrying the literal event 
sequence. Downstream
+   * merges rely on it to preserve original event lineage regardless of how 
rows start/end-at are
+   * coalesced.
+   */
+  private def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame): 
DataFrame = {
+    microbatchDf.withColumn(
+      colName = AutoCdcReservedNames.cdcMetadataColName,
+      col = Scd2BatchProcessor.constructTargetCdcMetadataCol(
+        recordStartAt = changeArgs.sequencing,
+        sequencingType = resolvedSequencingType
+      )
+    )
+  }
+
+  /**
+   * Apply the user's target column selection while preserving the SCD2 
framework columns; the
+   * latter are required by downstream merges and persisted to both the 
auxiliary and target
+   * tables, so users cannot deselect them.
+   *
+   * Requires the framework columns to already be present on the input.
+   */
+  private def projectTargetColumnsOntoMicrobatch(
+      microbatch: DataFrame
+  ): DataFrame = {
+    val caseSensitive = 
microbatch.sparkSession.sessionState.conf.caseSensitiveAnalysis
+
+    // Strip the framework columns through the same case-aware path as the 
user selection, for
+    // consistency with Scd1BatchProcessor.projectTargetColumnsOntoMicrobatch.
+    val dataSchema = ColumnSelection.applyToSchema(
+      schemaName = "microbatch",
+      schema = microbatch.schema,
+      columnSelection = Some(
+        ColumnSelection.ExcludeColumns(
+          
Scd2BatchProcessor.reservedFrameworkColNames.toSeq.map(UnqualifiedColumnName(_))
+        )
+      ),
+      caseSensitive = caseSensitive
+    )
+    val userSelectedDataSchema =
+      ColumnSelection.applyToSchema(
+        schemaName = "microbatch",
+        schema = dataSchema,
+        columnSelection = changeArgs.columnSelection,
+        caseSensitive = caseSensitive
+      )
+    val finalColumnsToSelect: Seq[Column] =
+      userSelectedDataSchema.fieldNames.toSeq.map(colName => {
+        // Spark drops backticks in the schema, quote all identifiers for 
safety before executing
+        // select. Identifiers could have special characters such as '.'.
+        F.col(QuotingUtils.quoteIdentifier(colName))
+      }) ++ Seq(
+        F.col(Scd2BatchProcessor.startAtColName),
+        F.col(Scd2BatchProcessor.endAtColName),
+        F.col(AutoCdcReservedNames.cdcMetadataColName)
+      )
+    microbatch.select(finalColumnsToSelect: _*)
+  }
+
+}
+
+/**
+ * Concept: run of upsert events.
+ *
+ * A run is a maximal sequence of consecutive upsert events (in sorted order 
by sequencing)
+ * for the same key whose tracked-history-column values are all identical. The 
transition
+ * from a previous run's tail to a new run's head represents a real state 
change; every
+ * subsequent event in the run is a no-op continuation that logically 
coalesces with the head.
+ *
+ * Runs matter because SCD2 only emits a new visible historical row when a
+ * tracked-history column actually changes. By convention we choose that only 
the tail of a
+ * run produces a visible row in the target table; the rest become hidden rows 
in the aux
+ * table. Selecting the tail means the latest no-op upsert is reflected in the 
target table.
+ *
+ * Example, with trackHistoryCols = [name], events for some key:
+ *   (S=5,  name=Alice)   -> starts run head at S=5. Row lives in aux table.
+ *   (S=10, name=Alice)   -> no-op, adds to run at S=5. Row lives in aux table.
+ *   (S=15, name=Alice)   -> no-op and tail of run at S=5. Row lives in target 
table with
+ *                           START_AT=5.
+ *   (S=20, name=Charlie) -> new run head/tail (run size=1) at S=20. Row lives 
in target
+ *                           table.
+ *
+ * Now if a new late-arriving event (S=12, name=Bob) arrives for the same key, 
we have:
+ *   (S=5,  name=Alice)   -> starts run head at S=5. Row lives in aux table.
+ *   (S=10, name=Alice)   -> no-op but now tail of run at S=5. Row now lives 
in target
+ *                           table with START_AT=5.
+ *   (S=12, name=Bob)     -> new run head/tail (run size=1) at S=12. Row lives 
in target
+ *                           table.
+ *   (S=15, name=Alice)   -> previously-visible tail converts to a new run 
head at S=15. Row
+ *                           remains in target table, but now with START_AT=15.
+ *   (S=20, name=Charlie) -> new run head at S=20. Row lives in target table.
+ *
+ * Note that if we did not track the no-op events in the aux table for the run 
at S=5 before the
+ * event (S=12, name=Bob) arrived, then we would not have correctly reconciled 
that the event
+ * (S=10, name=Alice) is now the visible tail of the Alice run before Bob.
+ *
+ * -------------
+ * Concept: target table.
+ *
+ * The user-consumable output table of the CDC transformation. Every row in 
the target table
+ * represents the visible tail of a run (maybe size 1), carrying the run 
head's START_AT and the
+ * latest row values for that run. The target table in its entirety represents 
the SCD2
+ * representation of the CDC flow's source table.
+ *
+ * -------------
+ * Concept: aux table.
+ *
+ * The side state table used to track out of order events from the CDC source. 
Two classes
+ * of events are represented as rows in this table:
+ *    1. Early-arriving deletes, with no matching upsert; this is considered a 
tombstone,
+ *       and may match with a late-arriving upsert in a future microbatch.
+ *    2. No-op upserts (i.e. tails of runs); hidden no-op rows that may 
reconcile as
+ *       state-changing run heads in a future microbatch.
+ *
+ * The aux table is considered an internal table that users should neither 
tamper nor consider
+ * public contract.
+ *
+ * -------------
+ * Concept: decomposition tail.
+ *
+ * A transient and synthetic row produced by the batch processor during 
reconciliation (not
+ * from the CDC source) when a previously-closed historical row [START_AT=X, 
END_AT=Y] is
+ * bisected by a late-arriving event. The bisected row is split into a head
+ * [START_AT=X, END_AT=null] - inheriting the original row's data and 
`__RECORD_START_AT` -
+ * and a tail [START_AT=null, END_AT=Y, `__RECORD_START_AT`=null] that carries 
the original
+ * row's right boundary. The tail typically becomes the closing END_AT of a 
bisecting upsert,
+ * giving it a valid right boundary in the target-table history.
+ *
+ * Decomposition tails are uniquely identified by `__RECORD_START_AT` = null - 
the only row
+ * category with that property - and are never persisted in their tail form: 
each is either
+ * absorbed by the next event in the affected window (dropped as redundant) or 
promoted to a
+ * tombstone in the aux table if it survives reconciliation unmatched.
+ *
+ * -------------
+ * Concept: same-sequence tie-break between an upsert and a delete.
+ *
+ * When an upsert event and a delete event share the same `__RECORD_START_AT`, 
the delete wins:
+ * the visible upsert is dropped (as a zero-width interval) and only the 
tombstone is written
+ * to the aux table. The reverse pair (delete arriving first, then an upsert 
at the same
+ * sequence) is symmetric: the tombstone closes the upsert at the same 
instant, again leaving
+ * a zero-width visible interval that is dropped, and only the tombstone 
survives.
+ *
+ * This tie-break is an internal contract only - we do not publicly guarantee 
deterministic
+ * resolution when two events for the same key share a sequence value. Users 
who care about
+ * ordering should ensure their sequencing column is unique per (key, event).
+ */
+object Scd2BatchProcessor {
+  /**
+   * Metadata field that represents the exact time (sequence) of the CDC event 
that produced
+   * this row. Null only for synthetic decomposition tails.
+   */
+  private[autocdc] val recordStartAtFieldName: String = "__RECORD_START_AT"
+
+  /**
+   * What this column represents depends on which AutoCDC artifact table it is 
read from.
+   *
+   * In the target table:
+   *    The user-visible column representing when this row is considered 
active from, i.e.
+   *    this upsert run's head [[recordStartAtFieldName]].
+   * In the aux table:
+   *    If this row represents a tombstone, then the same value as 
[[recordStartAtFieldName]].
+   *    Else this row represents a coalesced no-op row that is part of an 
upsert run.
+   *    Inherit the [[recordStartAtFieldName]] of the head of this upsert's 
run.
+   *
+   * The invariant in both tables is: startAtColName <= 
recordStartAtFieldName. If an event was
+   * generated at time X, it is active by time X, or earlier if it is not a 
run head.
+   */
+  private[autocdc] val startAtColName: String = "__START_AT"
+
+  /**
+   * What this column represents depends on which AutoCDC artifact table it is 
read from.
+   *
+   * In the target table:
+   *    The user-visible column representing when this row became inactive. 
Null IFF the row
+   *    is active: neither superseded by a state-changing upsert nor affected 
by a delete.
+   * In the aux table:
+   *    If this row is a tombstone, then by convention the sequence of the 
delete event that
+   *    produced it. Delete events are considered instantaneous in time.
+   *    Else this row is a coalesced no-op row that is part of an upsert run, 
and by
+   *    convention the value will always be null.
+   */
+  private[autocdc] val endAtColName: String = "__END_AT"
+
+  /**
+   * Column names reserved by AutoCDC, that will be projected onto the 
microbatch and target
+   * tables. If the user's source dataframe contains any of these columns, 
SCD2 reconciliation
+   * will fail.
+   *
+   * TODO(SPARK-57251): validate at [[AutoCdcMergeFlow]] construction time 
that the source
+   *   schema and column selection do not collide with these reserved names, 
so we fail fast
+   *   with a user-actionable error instead of silently overwriting them at 
preprocess time.
+   */
+  private val reservedFrameworkColNames: Set[String] = Set(
+    startAtColName,
+    endAtColName,
+    AutoCdcReservedNames.cdcMetadataColName
+  )
+
+  /**
+   * Schema of the CDC metadata struct column for SCD2 target table rows.
+   */
+  private[pipelines] def targetCdcMetadataColSchema(sequencingType: DataType): 
StructType =
+    StructType(
+      Seq(
+        // The sequence value of the originating CDC event for this row. 
Nullable because
+        // decomposition tails, which are transient and synthetically 
constructed during
+        // reconciliation, have a null record start at.
+        StructField(recordStartAtFieldName, sequencingType, nullable = true)
+      )
+    )
+
+  /**
+   * Construct the CDC metadata struct column for SCD2 target/microbatch rows, 
following the
+   * exact schema and field ordering defined by [[targetCdcMetadataColSchema]].
+   */
+  private def constructTargetCdcMetadataCol(
+      recordStartAt: Column,
+      sequencingType: DataType
+  ): Column = {
+    val cdcMetadataFieldsInOrder = 
targetCdcMetadataColSchema(sequencingType).fields.map { field =>
+      val value = field.name match {
+        case `recordStartAtFieldName` => recordStartAt
+        case other =>
+          throw SparkException.internalError(
+            s"Unable to construct SCD2 target CDC metadata column due to 
unknown " +
+              s"`${other}` field."
+          )
+      }
+      value.cast(field.dataType).as(field.name)
+    }
+    F.struct(cdcMetadataFieldsInOrder.toImmutableArraySeq: _*)
+  }
+}
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
index f88b0cd3a1cb..dd4d1556afbf 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
@@ -288,7 +288,7 @@ class AutoCdcMergeFlow(
       // CDC operational metadata column at the end.
       StructType(
         userSelectedSchema.fields :+ StructField(
-          Scd1BatchProcessor.cdcMetadataColName,
+          AutoCdcReservedNames.cdcMetadataColName,
           Scd1BatchProcessor.cdcMetadataColSchema(sequencingType),
           nullable = false
         )
@@ -334,7 +334,7 @@ class AutoCdcMergeFlow(
           deleteSequence = F.lit(null),
           upsertSequence = F.lit(null),
           sequencingType = sequencingType
-        ).as(Scd1BatchProcessor.cdcMetadataColName)
+        ).as(AutoCdcReservedNames.cdcMetadataColName)
 
         df.select(userSelectedCols :+ emptyCdcMetadataCol: _*)
       case ScdType.Type2 =>
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
index c4cd358344ca..5b5ec776c074 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
@@ -686,11 +686,12 @@ class Scd1MergeStreamingWrite(
   /** CDC metadata field resolved out of the flow's augmented schema. */
   private lazy val cdcMetadataField: StructField = {
     val resolver = updateContext.spark.sessionState.conf.resolver
+    val cdcMetadataColName = AutoCdcReservedNames.cdcMetadataColName
     flow.schema.fields
-      .find(field => resolver(field.name, 
Scd1BatchProcessor.cdcMetadataColName))
+      .find(field => resolver(field.name, cdcMetadataColName))
       .getOrElse(
         throw SparkException.internalError(
-          s"CDC metadata column '${Scd1BatchProcessor.cdcMetadataColName}' was 
not found in the " +
+          s"CDC metadata column '$cdcMetadataColName' was not found in the " +
           s"AutoCDC flow's target table schema."
         )
       )
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala
index 0dc0a9027660..8688df071113 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala
@@ -82,7 +82,7 @@ trait AutoCdcCatalogExecutionTestBase {
   }
 
   /**
-   * Schema of the [[Scd1BatchProcessor.cdcMetadataColName]] struct column for 
a given
+   * Schema of the [[AutoCdcReservedNames.cdcMetadataColName]] struct column 
for a given
    * sequencing column type. Defaults to [[LongType]] because all current SCD1 
tests use
    * `Long` sequencing.
    */
@@ -92,7 +92,7 @@ trait AutoCdcCatalogExecutionTestBase {
       .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, sequencingType)
 
   /**
-   * Build a [[Row]] matching the [[Scd1BatchProcessor.cdcMetadataColName]] 
struct's two fields,
+   * Build a [[Row]] matching the [[AutoCdcReservedNames.cdcMetadataColName]] 
struct's two fields,
    * in the order produced by [[Scd1BatchProcessor.constructCdcMetadataCol]]:
    */
   protected def cdcMetadataRow[T](deleteSeq: Option[T], upsertSeq: Option[T]): 
Row =
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
index cf7c9533bee9..32374f8ecb04 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
@@ -186,7 +186,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
 
   /** Convenience to extract the [[StructType]] of the projected 
`_cdc_metadata` column. */
   private def cdcMetadataStruct(schema: StructType): StructType =
-    
schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType]
+    
schema(AutoCdcReservedNames.cdcMetadataColName).dataType.asInstanceOf[StructType]
 
   test(
     "AutoCdcMergeFlow.schema appends _cdc_metadata to the source schema when 
no " +
@@ -200,7 +200,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
       .add("seq", LongType)
       .add(
         StructField(
-          Scd1BatchProcessor.cdcMetadataColName,
+          AutoCdcReservedNames.cdcMetadataColName,
           Scd1BatchProcessor.cdcMetadataColSchema(LongType),
           nullable = false
         )
@@ -223,7 +223,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
       .add("seq", LongType)
       .add(
         StructField(
-          Scd1BatchProcessor.cdcMetadataColName,
+          AutoCdcReservedNames.cdcMetadataColName,
           Scd1BatchProcessor.cdcMetadataColSchema(LongType),
           nullable = false
         )
@@ -244,7 +244,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
       .add("seq", LongType)
       .add(
         StructField(
-          Scd1BatchProcessor.cdcMetadataColName,
+          AutoCdcReservedNames.cdcMetadataColName,
           Scd1BatchProcessor.cdcMetadataColSchema(LongType),
           nullable = false
         )
@@ -270,7 +270,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
   test("AutoCdcMergeFlow.schema's _cdc_metadata field is non-null with 
nullable inner fields") {
     val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
 
-    val metaField = resolvedFlow.schema(Scd1BatchProcessor.cdcMetadataColName)
+    val metaField = 
resolvedFlow.schema(AutoCdcReservedNames.cdcMetadataColName)
     assert(!metaField.nullable, "_cdc_metadata column itself must be non-null")
 
     val metaStruct = metaField.dataType.asInstanceOf[StructType]
@@ -330,7 +330,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
     // The user-selected portion drops `name`; the trailing column is the SCD1 
metadata.
     assert(
       loadedDf.schema.fieldNames.toSeq ==
-      Seq("id", "seq", Scd1BatchProcessor.cdcMetadataColName)
+      Seq("id", "seq", AutoCdcReservedNames.cdcMetadataColName)
     )
   }
 
@@ -345,7 +345,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
     assert(loadedDf.schema == resolvedFlow.schema)
     assert(
       loadedDf.schema.fieldNames.toSeq ==
-      Seq("id", "seq", Scd1BatchProcessor.cdcMetadataColName)
+      Seq("id", "seq", AutoCdcReservedNames.cdcMetadataColName)
     )
   }
 
@@ -442,7 +442,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
     // Locks in the previous engine-level guard at flow-construction time. Any 
future
     // regression where a user-supplied CDC stream carries the reserved 
metadata column name
     // should fail eagerly here.
-    val sourceDf = 
sourceDfWithExtraColumns(Scd1BatchProcessor.cdcMetadataColName -> StringType)
+    val sourceDf = 
sourceDfWithExtraColumns(AutoCdcReservedNames.cdcMetadataColName -> StringType)
 
     checkError(
       exception = intercept[AnalysisException] {
@@ -452,7 +452,7 @@ class AutoCdcFlowSuite extends QueryTest with 
SharedSparkSession {
       sqlState = "42710",
       parameters = Map(
         "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
-        "columnName" -> Scd1BatchProcessor.cdcMetadataColName,
+        "columnName" -> AutoCdcReservedNames.cdcMetadataColName,
         "schemaName" -> "changeDataFeed",
         "reservedColumnNamePrefix" -> AutoCdcReservedNames.prefix
       )
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala
index 475d25f5aa2c..1aa2cbcd5417 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala
@@ -47,13 +47,13 @@ class Scd1BatchProcessorMergeSuite
    */
   private val minimalSchema: StructType = new StructType()
     .add("id", IntegerType)
-    .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+    .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
 
   /** Minimal target-table shape: one key, one data column, and CDC metadata. 
*/
   private val targetSchema: StructType = new StructType()
     .add("id", IntegerType)
     .add("value", StringType)
-    .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+    .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
 
   /**
    * A processor with a single key column `id`. `sequencing` is irrelevant for
@@ -85,7 +85,7 @@ class Scd1BatchProcessorMergeSuite
     val withKeys = keyColumns.foldLeft(new StructType()) { case (s, (name, 
dt)) =>
       s.add(name, dt)
     }
-    withKeys.add(Scd1BatchProcessor.cdcMetadataColName, 
cdcMetadataColSchemaType())
+    withKeys.add(AutoCdcReservedNames.cdcMetadataColName, 
cdcMetadataColSchemaType())
   }
 
   /**
@@ -116,7 +116,7 @@ class Scd1BatchProcessorMergeSuite
       .add("id", IntegerType)
       .add("value", StringType)
       .add(
-        Scd1BatchProcessor.cdcMetadataColName,
+        AutoCdcReservedNames.cdcMetadataColName,
         new StructType()
           .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
           .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
@@ -446,7 +446,7 @@ class Scd1BatchProcessorMergeSuite
       // The schema always stores the backtick consumed column name, so 
unticked the raw name here.
       .add(rawKeyName, IntegerType)
       .add("value", StringType)
-      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+      .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
 
     createTable(
       defaultTargetIdent,
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
index 9432150c4016..d2c78442c476 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
@@ -33,7 +33,7 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
     .add("name", StringType)
     .add("age", IntegerType)
     .add(
-      Scd1BatchProcessor.cdcMetadataColName,
+      AutoCdcReservedNames.cdcMetadataColName,
       new StructType()
         .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
         .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
@@ -596,7 +596,7 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
     // Original columns are preserved in their original order, with CDC 
metadata appended at
     // the very end.
     assert(result.schema.fieldNames.toSeq ==
-      schema.fieldNames.toSeq :+ Scd1BatchProcessor.cdcMetadataColName)
+      schema.fieldNames.toSeq :+ AutoCdcReservedNames.cdcMetadataColName)
   }
 
   test("extendMicrobatchRowsWithCdcMetadata casts delete / upsert sequence 
fields to " +
@@ -624,7 +624,7 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
     val resultDf = processor.extendMicrobatchRowsWithCdcMetadata(batch)
 
     val cdcMetadataDataType =
-      
resultDf.schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType]
+      
resultDf.schema(AutoCdcReservedNames.cdcMetadataColName).dataType.asInstanceOf[StructType]
     assert(columnNamesAndDataTypes(cdcMetadataDataType) == Seq(
       Scd1BatchProcessor.cdcDeleteSequenceFieldName -> LongType,
       Scd1BatchProcessor.cdcUpsertSequenceFieldName -> LongType))
@@ -723,7 +723,7 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
     val result = processor.projectTargetColumnsOntoMicrobatch(batch)
 
     assert(result.schema.fieldNames.toSeq ==
-      Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
+      Seq("id", "age", AutoCdcReservedNames.cdcMetadataColName))
     checkAnswer(
       df = result,
       expectedAnswer = Row(1, 30, Row(null, 10L))
@@ -753,7 +753,7 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
 
     assert(
       result.schema.fieldNames.toSeq ==
-        Seq("id", "name", Scd1BatchProcessor.cdcMetadataColName)
+        Seq("id", "name", AutoCdcReservedNames.cdcMetadataColName)
     )
     checkAnswer(
       df = result,
@@ -785,7 +785,7 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
     // in which the user listed columns in IncludeColumns. The CDC metadata 
column is appended
     // last as always.
     assert(result.schema.fieldNames.toSeq ==
-      Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
+      Seq("id", "age", AutoCdcReservedNames.cdcMetadataColName))
 
     checkAnswer(
       df = result,
@@ -800,7 +800,7 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
       // Even if a column is created with backticks via DDL, those backticks 
are consumed by Spark
       // before resolving the schema; they won't show up in the schema field.
       .add("user.id", StringType)
-      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+      .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType)
 
     val batch = microbatchOf(schema)(
       Row(1, "u-100", Row(null, 10L))
@@ -826,7 +826,7 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
     val result = processor.projectTargetColumnsOntoMicrobatch(batch)
 
     assert(result.schema.fieldNames.toSeq ==
-      Seq("id", "user.id", Scd1BatchProcessor.cdcMetadataColName))
+      Seq("id", "user.id", AutoCdcReservedNames.cdcMetadataColName))
     checkAnswer(
       df = result,
       expectedAnswer = Row(1, "u-100", Row(null, 10L))
@@ -860,7 +860,7 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
       // Output column names follow the microbatch schema's casing, not the 
casing in the user's
       // columnSelection. The CDC metadata column is appended last as always.
       assert(result.schema.fieldNames.toSeq ==
-        Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
+        Seq("id", "age", AutoCdcReservedNames.cdcMetadataColName))
       checkAnswer(
         df = result,
         expectedAnswer = Row(1, 30, Row(null, 10L))
@@ -880,7 +880,7 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
     // Data column.
     .add("value", StringType)
     // CDC metadata column.
-    .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+    .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType)
 
   /**
    * Schema for the auxiliary input to 
[[Scd1BatchProcessor.applyTombstonesToMicrobatch]] tests.
@@ -893,7 +893,7 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
     // Key column.
     .add("id", IntegerType)
     // CDC metadata column.
-    .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+    .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType)
 
   test("applyTombstonesToMicrobatch drops late-arriving deletes and upserts 
when a matching " +
     "tombstone exists for the same key") {
@@ -1015,7 +1015,7 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
     val schema = new StructType()
       .add("region", StringType)
       .add("customer_id", IntegerType)
-      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+      .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType)
 
     val microbatch = microbatchOf(schema)(
       Row("US", 1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))),
@@ -1051,7 +1051,7 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
   test("applyTombstonesToMicrobatch supports backticked key names containing a 
literal dot") {
     val schema = new StructType()
       .add("user.id", IntegerType)
-      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+      .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType)
 
     val microbatch = microbatchOf(schema)(
       Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5)))
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala
index 76790847ede5..bb8043e720c6 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala
@@ -41,12 +41,12 @@ class Scd1ForeachBatchHandlerSuite
 
   private val auxiliarySchema = new StructType()
     .add("id", IntegerType)
-    .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+    .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
 
   private val targetSchema = new StructType()
     .add("id", IntegerType)
     .add("value", StringType)
-    .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+    .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
 
   private val processor = Scd1BatchProcessor(
     changeArgs = ChangeArgs(
@@ -155,11 +155,11 @@ class Scd1ForeachBatchHandlerSuite
     val compositeAuxSchema = new StructType()
       .add("country", StringType)
       .add("city", StringType)
-      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+      .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
     val compositeTargetSchema = new StructType()
       .add("country", StringType)
       .add("city", StringType)
-      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+      .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
 
     val compositeProcessor = Scd1BatchProcessor(
       changeArgs = ChangeArgs(
@@ -492,12 +492,12 @@ class Scd1ForeachBatchHandlerSuite
     val compositeAuxSchema = new StructType()
       .add("country", StringType)
       .add("city", StringType)
-      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+      .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
     val compositeTargetSchema = new StructType()
       .add("country", StringType)
       .add("city", StringType)
       .add("population", LongType)
-      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+      .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
 
     val compositeProcessor = Scd1BatchProcessor(
       changeArgs = ChangeArgs(
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessorSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessorSuite.scala
new file mode 100644
index 000000000000..f02986320ab6
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessorSuite.scala
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{functions => F, QueryTest, Row}
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+class Scd2BatchProcessorSuite extends QueryTest with SharedSparkSession {
+
+  /** Build a microbatch [[DataFrame]] from explicit rows and an explicit 
schema. */
+  private def microbatchOf(schema: StructType)(rows: Row*): DataFrame =
+    spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
+
+  // =============== preprocessMicrobatch tests ===============
+
+  test("preprocessMicrobatch appends framework columns __START_AT, __END_AT, " 
+
+    "_cdc_metadata at the end of the schema in that order") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+
+    val batch = microbatchOf(schema)(Row(1, 10L, "a"))
+
+    val processor = Scd2BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type2
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.preprocessMicrobatch(batch)
+
+    assert(result.schema.fieldNames.toSeq == Seq(
+      "id", "seq", "value",
+      Scd2BatchProcessor.startAtColName,
+      Scd2BatchProcessor.endAtColName,
+      AutoCdcReservedNames.cdcMetadataColName
+    ))
+  }
+
+  test("preprocessMicrobatch returns an empty DataFrame with the full 
preprocessed schema") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+
+    val batch = microbatchOf(schema)()
+
+    val processor = Scd2BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type2
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.preprocessMicrobatch(batch)
+
+    assert(result.collect().isEmpty)
+    assert(result.schema.fieldNames.toSeq == Seq(
+      "id", "seq", "value",
+      Scd2BatchProcessor.startAtColName,
+      Scd2BatchProcessor.endAtColName,
+      AutoCdcReservedNames.cdcMetadataColName
+    ))
+  }
+
+  test("preprocessMicrobatch stamps __START_AT, __END_AT, and 
__RECORD_START_AT correctly " +
+    "across delete and upsert events for the same key") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+      .add("is_delete", BooleanType)
+
+    // All three events target the same key. SCD2 must preserve every event in 
the output -
+    // unlike SCD1, no per-key deduplication is performed; this also 
implicitly pins the
+    // no-dedup contract of preprocessMicrobatch.
+    val batch = microbatchOf(schema)(
+      Row(1, 10L, "first-upsert", false),
+      Row(1, 20L, "second-upsert", false),
+      Row(1, 30L, null, true)
+    )
+
+    val processor = Scd2BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type2,
+        deleteCondition = Some(F.col("is_delete"))
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    // Per-row contract for the framework columns:
+    //   - __START_AT      = sequencing for every row (the active-from time)
+    //   - __END_AT        = sequencing for delete rows; null for upserts 
(mutual exclusion)
+    //   - __RECORD_START_AT = sequencing for every row, regardless of delete 
vs upsert
+    //                        (lineage preserved into the merge step)
+    checkAnswer(
+      df = processor.preprocessMicrobatch(batch),
+      expectedAnswer = Seq(
+        Row(1, 10L, "first-upsert", false, 10L, null, Row(10L)),
+        Row(1, 20L, "second-upsert", false, 20L, null, Row(20L)),
+        Row(1, 30L, null, true, 30L, 30L, Row(30L))
+      )
+    )
+  }
+
+  test("preprocessMicrobatch preserves byte-identical full-event duplicates") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+      .add("is_delete", BooleanType)
+
+    // Two byte-identical events for the same key: same key, same sequencing, 
same data, same
+    // delete flag. SCD2 preprocessing intentionally preserves every event 
verbatim, including
+    // full-event duplicates. Cross-event redundancy elimination (collapsing 
duplicates before
+    // they could reconcile to a zero-width visible row) is the responsibility 
of downstream
+    // reconciliation, not preprocessing.
+    val batch = microbatchOf(schema)(
+      Row(1, 10L, "alice", false),
+      Row(1, 10L, "alice", false)
+    )
+
+    val processor = Scd2BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type2,
+        deleteCondition = Some(F.col("is_delete"))
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    // Both rows must survive verbatim.
+    checkAnswer(
+      df = processor.preprocessMicrobatch(batch),
+      expectedAnswer = Seq(
+        Row(1, 10L, "alice", false, 10L, null, Row(10L)),
+        Row(1, 10L, "alice", false, 10L, null, Row(10L))
+      )
+    )
+  }
+
+  test("preprocessMicrobatch leaves __END_AT null on every row when 
deleteCondition is None") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, 10L, "a"),
+      Row(2, 20L, "b")
+    )
+
+    val processor = Scd2BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type2,
+        deleteCondition = None
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    checkAnswer(
+      df = processor.preprocessMicrobatch(batch).select(
+        F.col(Scd2BatchProcessor.endAtColName)
+      ),
+      expectedAnswer = Seq(Row(null), Row(null))
+    )
+  }
+
+  test("preprocessMicrobatch treats null deleteCondition results as upsert " +
+    "(__END_AT stays null)") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("is_delete", BooleanType)
+
+    val batch = microbatchOf(schema)(
+      // is_delete is null - the delete condition evaluates to null, which 
Spark treats as the
+      // otherwise branch, so the row is classified as an upsert.
+      Row(1, 10L, null)
+    )
+
+    val processor = Scd2BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type2,
+        deleteCondition = Some(F.col("is_delete"))
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    checkAnswer(
+      df = processor.preprocessMicrobatch(batch).select(
+        F.col(Scd2BatchProcessor.endAtColName)
+      ),
+      expectedAnswer = Row(null)
+    )
+  }
+
+  test("preprocessMicrobatch evaluates an arbitrary sequencing expression 
per-row") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("alt_seq", LongType)
+      .add("value", StringType)
+
+    // Sequencing is a function call referencing multiple columns, not a bare 
identifier. Locks
+    // in that the framework columns evaluate the full expression per-row 
rather than treating
+    // `sequencing` as a single column reference.
+    val batch = microbatchOf(schema)(
+      // greatest(10, 30) = 30
+      Row(1, 10L, 30L, "row1"),
+      // greatest(40, 20) = 40
+      Row(2, 40L, 20L, "row2")
+    )
+
+    val processor = Scd2BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.greatest(F.col("seq"), F.col("alt_seq")),
+        storedAsScdType = ScdType.Type2
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.preprocessMicrobatch(batch)
+
+    checkAnswer(
+      df = result.select(
+        F.col(Scd2BatchProcessor.startAtColName),
+        F.col(s"${AutoCdcReservedNames.cdcMetadataColName}." +
+          s"${Scd2BatchProcessor.recordStartAtFieldName}")
+      ),
+      expectedAnswer = Seq(
+        Row(30L, 30L),
+        Row(40L, 40L)
+      )
+    )
+  }
+
+  /** Schema reused by columnSelection tests: id (key), name, age, seq 
(sequencing). */
+  private val multiUserColSchema: StructType = new StructType()
+    .add("id", IntegerType)
+    .add("name", StringType)
+    .add("age", IntegerType)
+    .add("seq", LongType)
+
+  test("preprocessMicrobatch keeps every user column when columnSelection is 
None") {
+    val batch = microbatchOf(multiUserColSchema)(
+      Row(1, "alice", 30, 10L)
+    )
+
+    val processor = Scd2BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type2,
+        columnSelection = None
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.preprocessMicrobatch(batch)
+
+    assert(result.schema.fieldNames.toSeq == Seq(
+      "id", "name", "age", "seq",
+      Scd2BatchProcessor.startAtColName,
+      Scd2BatchProcessor.endAtColName,
+      AutoCdcReservedNames.cdcMetadataColName
+    ))
+  }
+
+  test("preprocessMicrobatch retains framework columns even when 
IncludeColumns omits them") {
+    val batch = microbatchOf(multiUserColSchema)(
+      Row(1, "alice", 30, 10L)
+    )
+
+    val processor = Scd2BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type2,
+        columnSelection = Some(ColumnSelection.IncludeColumns(
+          Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("age"))
+        ))
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.preprocessMicrobatch(batch)
+
+    assert(result.schema.fieldNames.toSeq == Seq(
+      "id", "age",
+      Scd2BatchProcessor.startAtColName,
+      Scd2BatchProcessor.endAtColName,
+      AutoCdcReservedNames.cdcMetadataColName
+    ))
+    checkAnswer(
+      df = result,
+      expectedAnswer = Row(1, 30, 10L, null, Row(10L))
+    )
+  }
+
+  test("preprocessMicrobatch drops user columns listed in ExcludeColumns; " +
+    "framework columns survive") {
+    val batch = microbatchOf(multiUserColSchema)(
+      Row(1, "alice", 30, 10L)
+    )
+
+    val processor = Scd2BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type2,
+        columnSelection = Some(ColumnSelection.ExcludeColumns(
+          Seq(UnqualifiedColumnName("name"))
+        ))
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.preprocessMicrobatch(batch)
+
+    assert(result.schema.fieldNames.toSeq == Seq(
+      "id", "age", "seq",
+      Scd2BatchProcessor.startAtColName,
+      Scd2BatchProcessor.endAtColName,
+      AutoCdcReservedNames.cdcMetadataColName
+    ))
+    checkAnswer(
+      df = result,
+      expectedAnswer = Row(1, 30, 10L, 10L, null, Row(10L))
+    )
+  }
+
+  test("preprocessMicrobatch preserves the microbatch schema's user-column 
order, " +
+    "ignoring the order of IncludeColumns") {
+    val batch = microbatchOf(multiUserColSchema)(
+      Row(1, "alice", 30, 10L)
+    )
+
+    val processor = Scd2BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type2,
+        // User specifies (age, id) - intentionally different from the schema 
order (id, age).
+        columnSelection = Some(ColumnSelection.IncludeColumns(
+          Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("id"))
+        ))
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.preprocessMicrobatch(batch)
+
+    // Output column order follows the microbatch schema (id before age), not 
the user's listing
+    // order in IncludeColumns. Framework columns are always appended last.
+    assert(result.schema.fieldNames.toSeq == Seq(
+      "id", "age",
+      Scd2BatchProcessor.startAtColName,
+      Scd2BatchProcessor.endAtColName,
+      AutoCdcReservedNames.cdcMetadataColName
+    ))
+  }
+
+  test("preprocessMicrobatch resolves columnSelection case-insensitively " +
+    "when SQLConf.CASE_SENSITIVE=false") {
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      val batch = microbatchOf(multiUserColSchema)(
+        Row(1, "alice", 30, 10L)
+      )
+
+      val processor = Scd2BatchProcessor(
+        changeArgs = ChangeArgs(
+          keys = Seq(UnqualifiedColumnName("id")),
+          sequencing = F.col("seq"),
+          storedAsScdType = ScdType.Type2,
+          // User columns intentionally use a different case than the schema 
(id, age).
+          columnSelection = Some(ColumnSelection.IncludeColumns(
+            Seq(UnqualifiedColumnName("ID"), UnqualifiedColumnName("AGE"))
+          ))
+        ),
+        resolvedSequencingType = LongType
+      )
+
+      val result = processor.preprocessMicrobatch(batch)
+
+      // Output column names follow the microbatch schema's casing, not the 
user's casing.
+      assert(result.schema.fieldNames.toSeq == Seq(
+        "id", "age",
+        Scd2BatchProcessor.startAtColName,
+        Scd2BatchProcessor.endAtColName,
+        AutoCdcReservedNames.cdcMetadataColName
+      ))
+    }
+  }
+
+  test("preprocessMicrobatch handles backticked column names containing a 
literal dot") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      // Even if a column is created with backticks via DDL, those backticks 
are consumed by Spark
+      // before resolving the schema; they won't show up in the schema field.
+      .add("user.id", StringType)
+      .add("seq", LongType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, "u-100", 10L)
+    )
+
+    val processor = Scd2BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type2,
+        columnSelection = Some(ColumnSelection.IncludeColumns(
+          Seq(
+            UnqualifiedColumnName("id"),
+            UnqualifiedColumnName("`user.id`")
+          )
+        ))
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.preprocessMicrobatch(batch)
+
+    assert(result.schema.fieldNames.toSeq == Seq(
+      "id", "user.id",
+      Scd2BatchProcessor.startAtColName,
+      Scd2BatchProcessor.endAtColName,
+      AutoCdcReservedNames.cdcMetadataColName
+    ))
+    checkAnswer(
+      df = result,
+      expectedAnswer = Row(1, "u-100", 10L, null, Row(10L))
+    )
+  }
+
+  test("preprocessMicrobatch correctly populates framework columns even when 
ExcludeColumns " +
+    "drops the columns referenced by sequencing and deleteCondition") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("value", StringType)
+      // Both seq and is_delete are referenced by the flow's sequencing / 
deleteCondition
+      // expressions, but the user wants them excluded from the target table.
+      .add("seq", LongType)
+      .add("is_delete", BooleanType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, "alice", 10L, false),
+      Row(1, null, 20L, true)
+    )
+
+    val processor = Scd2BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type2,
+        deleteCondition = Some(F.col("is_delete")),
+        columnSelection = Some(ColumnSelection.ExcludeColumns(
+          Seq(UnqualifiedColumnName("seq"), UnqualifiedColumnName("is_delete"))
+        ))
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    // The orchestrator runs row-extension steps before column selection, so 
the framework
+    // columns reference seq / is_delete fully even though the final 
projection drops them.
+    val result = processor.preprocessMicrobatch(batch)
+
+    assert(result.schema.fieldNames.toSeq == Seq(
+      "id", "value",
+      Scd2BatchProcessor.startAtColName,
+      Scd2BatchProcessor.endAtColName,
+      AutoCdcReservedNames.cdcMetadataColName
+    ))
+    checkAnswer(
+      df = result,
+      expectedAnswer = Seq(
+        Row(1, "alice", 10L, null, Row(10L)),
+        Row(1, null, 20L, 20L, Row(20L))
+      )
+    )
+  }
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
index 302ff789c12d..9793605e884d 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.classic.DataFrame
 import 
org.apache.spark.sql.connector.catalog.SharedTablesInMemoryRowLevelOperationTableCatalog
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.pipelines.autocdc.{
+  AutoCdcReservedNames,
   ChangeArgs,
   ColumnSelection,
   Scd1BatchProcessor,
@@ -146,7 +147,7 @@ trait AutoCdcGraphExecutionTestMixin extends 
BeforeAndAfterEach {
    * Assumes sequence type is BIGINT (Long).
    */
   protected val cdcMetadataDdl: String = {
-    val col = Scd1BatchProcessor.cdcMetadataColName
+    val col = AutoCdcReservedNames.cdcMetadataColName
     val del = Scd1BatchProcessor.cdcDeleteSequenceFieldName
     val ups = Scd1BatchProcessor.cdcUpsertSequenceFieldName
     s"$col STRUCT<$del:BIGINT,$ups:BIGINT> NOT NULL"
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
index 3453235cbae8..bbd8f7c8dbf9 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
@@ -21,8 +21,8 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
 import org.apache.spark.sql.functions
 import org.apache.spark.sql.pipelines.autocdc.{
+  AutoCdcReservedNames,
   ColumnSelection,
-  Scd1BatchProcessor,
   UnqualifiedColumnName
 }
 import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
@@ -144,7 +144,7 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
 
     // The auxiliary table only contains keys and the metadata column, hence 
"name" should not be
     // included.
-    assert(auxSchema.fieldNames.toSeq == Seq("id", 
Scd1BatchProcessor.cdcMetadataColName))
+    assert(auxSchema.fieldNames.toSeq == Seq("id", 
AutoCdcReservedNames.cdcMetadataColName))
     assert(getAuxTableKeyColumnNames(target = "target") == Seq("id"))
   }
 
@@ -177,7 +177,7 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
 
     val auxSchema = spark.table(auxTableNameFor("target")).schema
     assert(auxSchema.fieldNames.toSeq ==
-      Seq("region", "id", Scd1BatchProcessor.cdcMetadataColName))
+      Seq("region", "id", AutoCdcReservedNames.cdcMetadataColName))
     assert(getAuxTableKeyColumnNames(target = "target") == Seq("region", "id"))
   }
 
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala
index 46f8ee47db02..a5f3a13a012a 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.pipelines.graph
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
 import org.apache.spark.sql.functions
-import org.apache.spark.sql.pipelines.autocdc.Scd1BatchProcessor
+import org.apache.spark.sql.pipelines.autocdc.AutoCdcReservedNames
 import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
 import org.apache.spark.sql.test.SharedSparkSession
 
@@ -147,8 +147,8 @@ class AutoCdcScd1TargetTableDurabilitySuite
 
     val schema = spark.table(s"$catalog.$namespace.target").schema
     assert(
-      schema.fieldNames.contains(Scd1BatchProcessor.cdcMetadataColName),
-      s"Target must have ${Scd1BatchProcessor.cdcMetadataColName} after first 
AutoCDC run; " +
+      schema.fieldNames.contains(AutoCdcReservedNames.cdcMetadataColName),
+      s"Target must have ${AutoCdcReservedNames.cdcMetadataColName} after 
first AutoCDC run; " +
       s"got ${schema.fieldNames.toSeq}"
     )
     checkAnswer(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to