This is an automated email from the ASF dual-hosted git repository.
jose-torres pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new ecf21a215ce3 [SPARK-57134][SDP] Implement SCD2 Batch Processor;
Preprocess Microbatch
ecf21a215ce3 is described below
commit ecf21a215ce34adc379853921d2370724389c354
Author: AnishMahto <[email protected]>
AuthorDate: Thu Jun 4 14:55:45 2026 -0700
[SPARK-57134][SDP] Implement SCD2 Batch Processor; Preprocess Microbatch
### What changes were proposed in this pull request?
**Preamble:**
The SCD type 2 flow is a foreachBatch streaming query on an input
change-data-feed, and is responsible for reconciling the incoming change data
onto some target table that follows SCD2 replication semantics.
SCD2 flows also maintain an "auxiliary" table to keep track of
early-arriving/out-of-order received events state. Each microbatch will need to
reconcile against this auxiliary table as well, and update the auxiliary
table's state appropriately for future microbatches.
**Preprocess Microbatch**
For SCD2, preprocessing the microbatch is all about getting it in the right
shape, aligned with the shape of the target table the microbatch will be merged
into + the shape that SCD2 itself as a standard demands.
That is:
The microbatch must have a start-at and end-at columns projected as per the
SCD2 standard, to indicate that a historical/alive record was active between
those sequence stamps
The microbatch will have the operational CDC metadata column projected,
which is needed to reconcile late arriving events/bookkeeping
As per the Spark AutoCDC API, the microbatch should project down to just
the user-specified column selection
Implement the part of the core SCD2 microbatch processor that does this
microbatch preprocessing.
### Why are the changes needed?
To support AutoCDC SCD2 transformations, as per the approved SPIP:
https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
### Does this PR introduce _any_ user-facing change?
No. New feature.
### How was this patch tested?
`Scd2BatchProcessorSuite`
### Was this patch authored or co-authored using generative AI tooling?
Co-written with Claude Opus 4.7
Closes #56208 from AnishMahto/SPARK-57134-SCD2-preprocess-microbatch.
Authored-by: AnishMahto <[email protected]>
Signed-off-by: Jose Torres <[email protected]>
(cherry picked from commit e4462556fafcc3aca42ec06896ac1ba75b52766a)
Signed-off-by: Jose Torres <[email protected]>
---
.../pipelines/autocdc/AutoCdcReservedNames.scala | 11 +
.../sql/pipelines/autocdc/Scd1BatchProcessor.scala | 13 +-
.../sql/pipelines/autocdc/Scd2BatchProcessor.scala | 335 ++++++++++++++
.../apache/spark/sql/pipelines/graph/Flow.scala | 4 +-
.../spark/sql/pipelines/graph/FlowExecution.scala | 5 +-
.../autocdc/AutoCdcCatalogExecutionTestBase.scala | 4 +-
.../sql/pipelines/autocdc/AutoCdcFlowSuite.scala | 18 +-
.../autocdc/Scd1BatchProcessorMergeSuite.scala | 10 +-
.../autocdc/Scd1BatchProcessorSuite.scala | 26 +-
.../autocdc/Scd1ForeachBatchHandlerSuite.scala | 12 +-
.../autocdc/Scd2BatchProcessorSuite.scala | 515 +++++++++++++++++++++
.../graph/AutoCdcGraphExecutionTestMixin.scala | 3 +-
.../AutoCdcScd1AuxiliaryTableDurabilitySuite.scala | 6 +-
.../AutoCdcScd1TargetTableDurabilitySuite.scala | 6 +-
14 files changed, 915 insertions(+), 53 deletions(-)
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala
index 2b0f8e293e76..8284441e9e2b 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcReservedNames.scala
@@ -29,4 +29,15 @@ private[pipelines] object AutoCdcReservedNames {
/** Common reserved-name prefix shared by AutoCDC internal columns and
internal tables. */
val prefix: String = "__spark_autocdc_"
+
+ /**
+ * Reserved name of the operational metadata column AutoCDC that is
projected on every AutoCDC
+ * microbatch, auxiliary table, and target table.
+ *
+ * Shared across all SCD strategies and across the flow resolution,
batch-processor, and
+ * streaming-write layers.
+ *
+ * Note that the schema of the CDC metadata column however can and does
differ on the SCD-type.
+ */
+ val cdcMetadataColName: String = s"${prefix}metadata"
}
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index 0656a7eb91b0..35006dc4ee21 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -143,7 +143,7 @@ case class Scd1BatchProcessor(
F.when(rowDeleteSequence.isNull,
changeArgs.sequencing).otherwise(F.lit(null))
validatedMicrobatch.withColumn(
- Scd1BatchProcessor.cdcMetadataColName,
+ AutoCdcReservedNames.cdcMetadataColName,
Scd1BatchProcessor.constructCdcMetadataCol(
deleteSequence = rowDeleteSequence,
upsertSequence = rowUpsertSequence,
@@ -175,7 +175,7 @@ case class Scd1BatchProcessor(
schema = microbatchWithCdcMetadataDf.schema,
columnSelection = Some(
ColumnSelection.ExcludeColumns(
- Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName))
+ Seq(UnqualifiedColumnName(AutoCdcReservedNames.cdcMetadataColName))
)
),
caseSensitive = caseSensitiveColumnComparison
@@ -197,7 +197,7 @@ case class Scd1BatchProcessor(
// select. Identifiers could have special characters such as '.'.
F.col(QuotingUtils.quoteIdentifier(colName))
}) :+ F.col(
- Scd1BatchProcessor.cdcMetadataColName
+ AutoCdcReservedNames.cdcMetadataColName
)
microbatchWithCdcMetadataDf.select(
@@ -223,7 +223,7 @@ case class Scd1BatchProcessor(
val aliasedMicrobatchDf = microbatchDf.alias("microbatch")
val aliasedAuxiliaryTableDf = auxiliaryTableDf.alias("auxiliaryTable")
- val cdcMetadata = Scd1BatchProcessor.cdcMetadataColName
+ val cdcMetadata = AutoCdcReservedNames.cdcMetadataColName
val microbatchCdcMetadata = F.col(s"microbatch.$cdcMetadata")
val effectiveSeq = F.greatest(
@@ -267,7 +267,7 @@ case class Scd1BatchProcessor(
auxiliaryTableIdentifier: TableIdentifier
): Unit = {
val auxIdentQuoted = auxiliaryTableIdentifier.quotedString
- val meta = Scd1BatchProcessor.cdcMetadataColName
+ val meta = AutoCdcReservedNames.cdcMetadataColName
// Project the reconciled microbatch down to just keys + `_cdc_metadata`;
data columns are
// irrelevant for the auxiliary table and should not be persisted.
@@ -330,7 +330,7 @@ case class Scd1BatchProcessor(
reconciledMicrobatchDf: DataFrame,
targetTableIdentifier: TableIdentifier
): Unit = {
- val meta = Scd1BatchProcessor.cdcMetadataColName
+ val meta = AutoCdcReservedNames.cdcMetadataColName
val destinationTableStr = targetTableIdentifier.quotedString
// (Re-)alias the reconciled microbatch DF for easy reference for the
remainder of the merge.
@@ -415,7 +415,6 @@ object Scd1BatchProcessor {
* enforced at [[org.apache.spark.sql.pipelines.graph.AutoCdcMergeFlow]]
construction.
*/
private[autocdc] val winningRowColName: String =
s"${AutoCdcReservedNames.prefix}winning_row"
- private[pipelines] val cdcMetadataColName: String =
s"${AutoCdcReservedNames.prefix}metadata"
private[pipelines] val cdcDeleteSequenceFieldName: String = "deleteSequence"
private[pipelines] val cdcUpsertSequenceFieldName: String = "upsertSequence"
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala
new file mode 100644
index 000000000000..60fda36fea07
--- /dev/null
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessor.scala
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{functions => F}
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.util.QuotingUtils
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.ArrayImplicits._
+
+/**
+ * Per-microbatch processor for SCD Type 2 AutoCDC flows, complying to the
specified
+ * [[changeArgs]] configuration.
+ *
+ * @param changeArgs The CDC flow configuration.
+ * @param resolvedSequencingType The post-analysis [[DataType]] of the
sequencing column, derived
+ * from the flow's resolved DataFrame at flow
setup time.
+ */
+case class Scd2BatchProcessor(
+ changeArgs: ChangeArgs,
+ resolvedSequencingType: DataType) {
+
+ /**
+ * Reconcile a CDC microbatch into the canonical form the auxiliary- and
target-table merges
+ * consume.
+ *
+ * Step ordering is load-bearing: the row-extension steps reference user
data columns that
+ * target-column selection is allowed to drop, so selection runs last.
Unlike SCD1, no per-key
+ * deduplication step is performed here - SCD2 preserves every event as part
of the row's
+ * history, including byte-identical full-event duplicates.
+ *
+ * Duplicate event elimination (e.g., collapsing two identical events at the
same sequence),
+ * whether across microbatches or within the same microbatch, is the
responsibility of
+ * downstream reconciliation - not preprocessing.
+ *
+ * Produces a dataframe that retains every input row 1:1 - no rows added,
dropped, reordered,
+ * or merged - with the following schema, in column order:
+ * 1. The user columns of `microbatchDf` that survive
[[ChangeArgs.columnSelection]], in the
+ * order they appeared in the input.
+ * 2. `__START_AT` column, populated with the sequence value of the row.
+ * 3. `__END_AT` column, populated with the sequence value of the row IFF
it's a delete event,
+ * null otherwise.
+ * 4. `__spark_autocdc_metadata` column.
+ */
+ private[autocdc] def preprocessMicrobatch(microbatchDf: DataFrame):
DataFrame = {
+ microbatchDf
+ .transform(extendMicrobatchRowsWithStartAt)
+ .transform(extendMicrobatchRowsWithEndAt)
+ .transform(extendMicrobatchRowsWithCdcMetadata)
+ .transform(projectTargetColumnsOntoMicrobatch)
+ }
+
+ /**
+ * Stamp each microbatch row with its currently known start-at (i.e
active-from) using its
+ * sequencing.
+ */
+ private def extendMicrobatchRowsWithStartAt(microbatchDf: DataFrame):
DataFrame = {
+ microbatchDf.withColumn(
+ colName = Scd2BatchProcessor.startAtColName,
+ col = changeArgs.sequencing.cast(resolvedSequencingType)
+ )
+ }
+
+ /**
+ * Stamp each microbatch delete event row with its end time sequence, as
they are instantaneous
+ * events.
+ *
+ * Non-deletes leave a null end, as we do not yet know if the row represents
an active upsert,
+ * or a closed upsert. This will become clear in later reconciliation
against the aux/target
+ * tables.
+ */
+ private def extendMicrobatchRowsWithEndAt(microbatchDf: DataFrame):
DataFrame = {
+ microbatchDf.withColumn(
+ colName = Scd2BatchProcessor.endAtColName,
+ col = (
+ changeArgs.deleteCondition match {
+ case Some(deleteCondition) =>
+ F.when(deleteCondition,
changeArgs.sequencing).otherwise(F.lit(null))
+ case None =>
+ F.lit(null)
+ }
+ ).cast(resolvedSequencingType)
+ )
+ }
+
+ /**
+ * Project the operational CDC metadata column carrying the literal event
sequence. Downstream
+ * merges rely on it to preserve original event lineage regardless of how
rows start/end-at are
+ * coalesced.
+ */
+ private def extendMicrobatchRowsWithCdcMetadata(microbatchDf: DataFrame):
DataFrame = {
+ microbatchDf.withColumn(
+ colName = AutoCdcReservedNames.cdcMetadataColName,
+ col = Scd2BatchProcessor.constructTargetCdcMetadataCol(
+ recordStartAt = changeArgs.sequencing,
+ sequencingType = resolvedSequencingType
+ )
+ )
+ }
+
+ /**
+ * Apply the user's target column selection while preserving the SCD2
framework columns; the
+ * latter are required by downstream merges and persisted to both the
auxiliary and target
+ * tables, so users cannot deselect them.
+ *
+ * Requires the framework columns to already be present on the input.
+ */
+ private def projectTargetColumnsOntoMicrobatch(
+ microbatch: DataFrame
+ ): DataFrame = {
+ val caseSensitive =
microbatch.sparkSession.sessionState.conf.caseSensitiveAnalysis
+
+ // Strip the framework columns through the same case-aware path as the
user selection, for
+ // consistency with Scd1BatchProcessor.projectTargetColumnsOntoMicrobatch.
+ val dataSchema = ColumnSelection.applyToSchema(
+ schemaName = "microbatch",
+ schema = microbatch.schema,
+ columnSelection = Some(
+ ColumnSelection.ExcludeColumns(
+
Scd2BatchProcessor.reservedFrameworkColNames.toSeq.map(UnqualifiedColumnName(_))
+ )
+ ),
+ caseSensitive = caseSensitive
+ )
+ val userSelectedDataSchema =
+ ColumnSelection.applyToSchema(
+ schemaName = "microbatch",
+ schema = dataSchema,
+ columnSelection = changeArgs.columnSelection,
+ caseSensitive = caseSensitive
+ )
+ val finalColumnsToSelect: Seq[Column] =
+ userSelectedDataSchema.fieldNames.toSeq.map(colName => {
+ // Spark drops backticks in the schema, quote all identifiers for
safety before executing
+ // select. Identifiers could have special characters such as '.'.
+ F.col(QuotingUtils.quoteIdentifier(colName))
+ }) ++ Seq(
+ F.col(Scd2BatchProcessor.startAtColName),
+ F.col(Scd2BatchProcessor.endAtColName),
+ F.col(AutoCdcReservedNames.cdcMetadataColName)
+ )
+ microbatch.select(finalColumnsToSelect: _*)
+ }
+
+}
+
+/**
+ * Concept: run of upsert events.
+ *
+ * A run is a maximal sequence of consecutive upsert events (in sorted order
by sequencing)
+ * for the same key whose tracked-history-column values are all identical. The
transition
+ * from a previous run's tail to a new run's head represents a real state
change; every
+ * subsequent event in the run is a no-op continuation that logically
coalesces with the head.
+ *
+ * Runs matter because SCD2 only emits a new visible historical row when a
+ * tracked-history column actually changes. By convention we choose that only
the tail of a
+ * run produces a visible row in the target table; the rest become hidden rows
in the aux
+ * table. Selecting the tail means the latest no-op upsert is reflected in the
target table.
+ *
+ * Example, with trackHistoryCols = [name], events for some key:
+ * (S=5, name=Alice) -> starts run head at S=5. Row lives in aux table.
+ * (S=10, name=Alice) -> no-op, adds to run at S=5. Row lives in aux table.
+ * (S=15, name=Alice) -> no-op and tail of run at S=5. Row lives in target
table with
+ * START_AT=5.
+ * (S=20, name=Charlie) -> new run head/tail (run size=1) at S=20. Row lives
in target
+ * table.
+ *
+ * Now if a new late-arriving event (S=12, name=Bob) arrives for the same key,
we have:
+ * (S=5, name=Alice) -> starts run head at S=5. Row lives in aux table.
+ * (S=10, name=Alice) -> no-op but now tail of run at S=5. Row now lives
in target
+ * table with START_AT=5.
+ * (S=12, name=Bob) -> new run head/tail (run size=1) at S=12. Row lives
in target
+ * table.
+ * (S=15, name=Alice) -> previously-visible tail converts to a new run
head at S=15. Row
+ * remains in target table, but now with START_AT=15.
+ * (S=20, name=Charlie) -> new run head at S=20. Row lives in target table.
+ *
+ * Note that if we did not track the no-op events in the aux table for the run
at S=5 before the
+ * event (S=12, name=Bob) arrived, then we would not have correctly reconciled
that the event
+ * (S=10, name=Alice) is now the visible tail of the Alice run before Bob.
+ *
+ * -------------
+ * Concept: target table.
+ *
+ * The user-consumable output table of the CDC transformation. Every row in
the target table
+ * represents the visible tail of a run (maybe size 1), carrying the run
head's START_AT and the
+ * latest row values for that run. The target table in its entirety represents
the SCD2
+ * representation of the CDC flow's source table.
+ *
+ * -------------
+ * Concept: aux table.
+ *
+ * The side state table used to track out of order events from the CDC source.
Two classes
+ * of events are represented as rows in this table:
+ * 1. Early-arriving deletes, with no matching upsert; this is considered a
tombstone,
+ * and may match with a late-arriving upsert in a future microbatch.
+ * 2. No-op upserts (i.e. tails of runs); hidden no-op rows that may
reconcile as
+ * state-changing run heads in a future microbatch.
+ *
+ * The aux table is considered an internal table that users should neither
tamper nor consider
+ * public contract.
+ *
+ * -------------
+ * Concept: decomposition tail.
+ *
+ * A transient and synthetic row produced by the batch processor during
reconciliation (not
+ * from the CDC source) when a previously-closed historical row [START_AT=X,
END_AT=Y] is
+ * bisected by a late-arriving event. The bisected row is split into a head
+ * [START_AT=X, END_AT=null] - inheriting the original row's data and
`__RECORD_START_AT` -
+ * and a tail [START_AT=null, END_AT=Y, `__RECORD_START_AT`=null] that carries
the original
+ * row's right boundary. The tail typically becomes the closing END_AT of a
bisecting upsert,
+ * giving it a valid right boundary in the target-table history.
+ *
+ * Decomposition tails are uniquely identified by `__RECORD_START_AT` = null -
the only row
+ * category with that property - and are never persisted in their tail form:
each is either
+ * absorbed by the next event in the affected window (dropped as redundant) or
promoted to a
+ * tombstone in the aux table if it survives reconciliation unmatched.
+ *
+ * -------------
+ * Concept: same-sequence tie-break between an upsert and a delete.
+ *
+ * When an upsert event and a delete event share the same `__RECORD_START_AT`,
the delete wins:
+ * the visible upsert is dropped (as a zero-width interval) and only the
tombstone is written
+ * to the aux table. The reverse pair (delete arriving first, then an upsert
at the same
+ * sequence) is symmetric: the tombstone closes the upsert at the same
instant, again leaving
+ * a zero-width visible interval that is dropped, and only the tombstone
survives.
+ *
+ * This tie-break is an internal contract only - we do not publicly guarantee
deterministic
+ * resolution when two events for the same key share a sequence value. Users
who care about
+ * ordering should ensure their sequencing column is unique per (key, event).
+ */
+object Scd2BatchProcessor {
+ /**
+ * Metadata field that represents the exact time (sequence) of the CDC event
that produced
+ * this row. Null only for synthetic decomposition tails.
+ */
+ private[autocdc] val recordStartAtFieldName: String = "__RECORD_START_AT"
+
+ /**
+ * What this column represents depends on which AutoCDC artifact table it is
read from.
+ *
+ * In the target table:
+ * The user-visible column representing when this row is considered
active from, i.e.
+ * this upsert run's head [[recordStartAtFieldName]].
+ * In the aux table:
+ * If this row represents a tombstone, then the same value as
[[recordStartAtFieldName]].
+ * Else this row represents a coalesced no-op row that is part of an
upsert run.
+ * Inherit the [[recordStartAtFieldName]] of the head of this upsert's
run.
+ *
+ * The invariant in both tables is: startAtColName <=
recordStartAtFieldName. If an event was
+ * generated at time X, it is active by time X, or earlier if it is not a
run head.
+ */
+ private[autocdc] val startAtColName: String = "__START_AT"
+
+ /**
+ * What this column represents depends on which AutoCDC artifact table it is
read from.
+ *
+ * In the target table:
+ * The user-visible column representing when this row became inactive.
Null IFF the row
+ * is active: neither superseded by a state-changing upsert nor affected
by a delete.
+ * In the aux table:
+ * If this row is a tombstone, then by convention the sequence of the
delete event that
+ * produced it. Delete events are considered instantaneous in time.
+ * Else this row is a coalesced no-op row that is part of an upsert run,
and by
+ * convention the value will always be null.
+ */
+ private[autocdc] val endAtColName: String = "__END_AT"
+
+ /**
+ * Column names reserved by AutoCDC, that will be projected onto the
microbatch and target
+ * tables. If the user's source dataframe contains any of these columns,
SCD2 reconciliation
+ * will fail.
+ *
+ * TODO(SPARK-57251): validate at [[AutoCdcMergeFlow]] construction time
that the source
+ * schema and column selection do not collide with these reserved names,
so we fail fast
+ * with a user-actionable error instead of silently overwriting them at
preprocess time.
+ */
+ private val reservedFrameworkColNames: Set[String] = Set(
+ startAtColName,
+ endAtColName,
+ AutoCdcReservedNames.cdcMetadataColName
+ )
+
+ /**
+ * Schema of the CDC metadata struct column for SCD2 target table rows.
+ */
+ private[pipelines] def targetCdcMetadataColSchema(sequencingType: DataType):
StructType =
+ StructType(
+ Seq(
+ // The sequence value of the originating CDC event for this row.
Nullable because
+ // decomposition tails, which are transient and synthetically
constructed during
+ // reconciliation, have a null record start at.
+ StructField(recordStartAtFieldName, sequencingType, nullable = true)
+ )
+ )
+
+ /**
+ * Construct the CDC metadata struct column for SCD2 target/microbatch rows,
following the
+ * exact schema and field ordering defined by [[targetCdcMetadataColSchema]].
+ */
+ private def constructTargetCdcMetadataCol(
+ recordStartAt: Column,
+ sequencingType: DataType
+ ): Column = {
+ val cdcMetadataFieldsInOrder =
targetCdcMetadataColSchema(sequencingType).fields.map { field =>
+ val value = field.name match {
+ case `recordStartAtFieldName` => recordStartAt
+ case other =>
+ throw SparkException.internalError(
+ s"Unable to construct SCD2 target CDC metadata column due to
unknown " +
+ s"`${other}` field."
+ )
+ }
+ value.cast(field.dataType).as(field.name)
+ }
+ F.struct(cdcMetadataFieldsInOrder.toImmutableArraySeq: _*)
+ }
+}
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
index f88b0cd3a1cb..dd4d1556afbf 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala
@@ -288,7 +288,7 @@ class AutoCdcMergeFlow(
// CDC operational metadata column at the end.
StructType(
userSelectedSchema.fields :+ StructField(
- Scd1BatchProcessor.cdcMetadataColName,
+ AutoCdcReservedNames.cdcMetadataColName,
Scd1BatchProcessor.cdcMetadataColSchema(sequencingType),
nullable = false
)
@@ -334,7 +334,7 @@ class AutoCdcMergeFlow(
deleteSequence = F.lit(null),
upsertSequence = F.lit(null),
sequencingType = sequencingType
- ).as(Scd1BatchProcessor.cdcMetadataColName)
+ ).as(AutoCdcReservedNames.cdcMetadataColName)
df.select(userSelectedCols :+ emptyCdcMetadataCol: _*)
case ScdType.Type2 =>
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
index c4cd358344ca..5b5ec776c074 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala
@@ -686,11 +686,12 @@ class Scd1MergeStreamingWrite(
/** CDC metadata field resolved out of the flow's augmented schema. */
private lazy val cdcMetadataField: StructField = {
val resolver = updateContext.spark.sessionState.conf.resolver
+ val cdcMetadataColName = AutoCdcReservedNames.cdcMetadataColName
flow.schema.fields
- .find(field => resolver(field.name,
Scd1BatchProcessor.cdcMetadataColName))
+ .find(field => resolver(field.name, cdcMetadataColName))
.getOrElse(
throw SparkException.internalError(
- s"CDC metadata column '${Scd1BatchProcessor.cdcMetadataColName}' was
not found in the " +
+ s"CDC metadata column '$cdcMetadataColName' was not found in the " +
s"AutoCDC flow's target table schema."
)
)
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala
index 0dc0a9027660..8688df071113 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala
@@ -82,7 +82,7 @@ trait AutoCdcCatalogExecutionTestBase {
}
/**
- * Schema of the [[Scd1BatchProcessor.cdcMetadataColName]] struct column for
a given
+ * Schema of the [[AutoCdcReservedNames.cdcMetadataColName]] struct column
for a given
* sequencing column type. Defaults to [[LongType]] because all current SCD1
tests use
* `Long` sequencing.
*/
@@ -92,7 +92,7 @@ trait AutoCdcCatalogExecutionTestBase {
.add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, sequencingType)
/**
- * Build a [[Row]] matching the [[Scd1BatchProcessor.cdcMetadataColName]]
struct's two fields,
+ * Build a [[Row]] matching the [[AutoCdcReservedNames.cdcMetadataColName]]
struct's two fields,
* in the order produced by [[Scd1BatchProcessor.constructCdcMetadataCol]]:
*/
protected def cdcMetadataRow[T](deleteSeq: Option[T], upsertSeq: Option[T]):
Row =
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
index cf7c9533bee9..32374f8ecb04 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala
@@ -186,7 +186,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
/** Convenience to extract the [[StructType]] of the projected
`_cdc_metadata` column. */
private def cdcMetadataStruct(schema: StructType): StructType =
-
schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType]
+
schema(AutoCdcReservedNames.cdcMetadataColName).dataType.asInstanceOf[StructType]
test(
"AutoCdcMergeFlow.schema appends _cdc_metadata to the source schema when
no " +
@@ -200,7 +200,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
.add("seq", LongType)
.add(
StructField(
- Scd1BatchProcessor.cdcMetadataColName,
+ AutoCdcReservedNames.cdcMetadataColName,
Scd1BatchProcessor.cdcMetadataColSchema(LongType),
nullable = false
)
@@ -223,7 +223,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
.add("seq", LongType)
.add(
StructField(
- Scd1BatchProcessor.cdcMetadataColName,
+ AutoCdcReservedNames.cdcMetadataColName,
Scd1BatchProcessor.cdcMetadataColSchema(LongType),
nullable = false
)
@@ -244,7 +244,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
.add("seq", LongType)
.add(
StructField(
- Scd1BatchProcessor.cdcMetadataColName,
+ AutoCdcReservedNames.cdcMetadataColName,
Scd1BatchProcessor.cdcMetadataColSchema(LongType),
nullable = false
)
@@ -270,7 +270,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
test("AutoCdcMergeFlow.schema's _cdc_metadata field is non-null with
nullable inner fields") {
val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
- val metaField = resolvedFlow.schema(Scd1BatchProcessor.cdcMetadataColName)
+ val metaField =
resolvedFlow.schema(AutoCdcReservedNames.cdcMetadataColName)
assert(!metaField.nullable, "_cdc_metadata column itself must be non-null")
val metaStruct = metaField.dataType.asInstanceOf[StructType]
@@ -330,7 +330,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
// The user-selected portion drops `name`; the trailing column is the SCD1
metadata.
assert(
loadedDf.schema.fieldNames.toSeq ==
- Seq("id", "seq", Scd1BatchProcessor.cdcMetadataColName)
+ Seq("id", "seq", AutoCdcReservedNames.cdcMetadataColName)
)
}
@@ -345,7 +345,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
assert(loadedDf.schema == resolvedFlow.schema)
assert(
loadedDf.schema.fieldNames.toSeq ==
- Seq("id", "seq", Scd1BatchProcessor.cdcMetadataColName)
+ Seq("id", "seq", AutoCdcReservedNames.cdcMetadataColName)
)
}
@@ -442,7 +442,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
// Locks in the previous engine-level guard at flow-construction time. Any
future
// regression where a user-supplied CDC stream carries the reserved
metadata column name
// should fail eagerly here.
- val sourceDf =
sourceDfWithExtraColumns(Scd1BatchProcessor.cdcMetadataColName -> StringType)
+ val sourceDf =
sourceDfWithExtraColumns(AutoCdcReservedNames.cdcMetadataColName -> StringType)
checkError(
exception = intercept[AnalysisException] {
@@ -452,7 +452,7 @@ class AutoCdcFlowSuite extends QueryTest with
SharedSparkSession {
sqlState = "42710",
parameters = Map(
"caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
- "columnName" -> Scd1BatchProcessor.cdcMetadataColName,
+ "columnName" -> AutoCdcReservedNames.cdcMetadataColName,
"schemaName" -> "changeDataFeed",
"reservedColumnNamePrefix" -> AutoCdcReservedNames.prefix
)
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala
index 475d25f5aa2c..1aa2cbcd5417 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala
@@ -47,13 +47,13 @@ class Scd1BatchProcessorMergeSuite
*/
private val minimalSchema: StructType = new StructType()
.add("id", IntegerType)
- .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+ .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
/** Minimal target-table shape: one key, one data column, and CDC metadata.
*/
private val targetSchema: StructType = new StructType()
.add("id", IntegerType)
.add("value", StringType)
- .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+ .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
/**
* A processor with a single key column `id`. `sequencing` is irrelevant for
@@ -85,7 +85,7 @@ class Scd1BatchProcessorMergeSuite
val withKeys = keyColumns.foldLeft(new StructType()) { case (s, (name,
dt)) =>
s.add(name, dt)
}
- withKeys.add(Scd1BatchProcessor.cdcMetadataColName,
cdcMetadataColSchemaType())
+ withKeys.add(AutoCdcReservedNames.cdcMetadataColName,
cdcMetadataColSchemaType())
}
/**
@@ -116,7 +116,7 @@ class Scd1BatchProcessorMergeSuite
.add("id", IntegerType)
.add("value", StringType)
.add(
- Scd1BatchProcessor.cdcMetadataColName,
+ AutoCdcReservedNames.cdcMetadataColName,
new StructType()
.add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
.add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
@@ -446,7 +446,7 @@ class Scd1BatchProcessorMergeSuite
// The schema always stores the backtick consumed column name, so
unticked the raw name here.
.add(rawKeyName, IntegerType)
.add("value", StringType)
- .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+ .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
createTable(
defaultTargetIdent,
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
index 9432150c4016..d2c78442c476 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
@@ -33,7 +33,7 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
.add("name", StringType)
.add("age", IntegerType)
.add(
- Scd1BatchProcessor.cdcMetadataColName,
+ AutoCdcReservedNames.cdcMetadataColName,
new StructType()
.add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
.add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
@@ -596,7 +596,7 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
// Original columns are preserved in their original order, with CDC
metadata appended at
// the very end.
assert(result.schema.fieldNames.toSeq ==
- schema.fieldNames.toSeq :+ Scd1BatchProcessor.cdcMetadataColName)
+ schema.fieldNames.toSeq :+ AutoCdcReservedNames.cdcMetadataColName)
}
test("extendMicrobatchRowsWithCdcMetadata casts delete / upsert sequence
fields to " +
@@ -624,7 +624,7 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
val resultDf = processor.extendMicrobatchRowsWithCdcMetadata(batch)
val cdcMetadataDataType =
-
resultDf.schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType]
+
resultDf.schema(AutoCdcReservedNames.cdcMetadataColName).dataType.asInstanceOf[StructType]
assert(columnNamesAndDataTypes(cdcMetadataDataType) == Seq(
Scd1BatchProcessor.cdcDeleteSequenceFieldName -> LongType,
Scd1BatchProcessor.cdcUpsertSequenceFieldName -> LongType))
@@ -723,7 +723,7 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
val result = processor.projectTargetColumnsOntoMicrobatch(batch)
assert(result.schema.fieldNames.toSeq ==
- Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
+ Seq("id", "age", AutoCdcReservedNames.cdcMetadataColName))
checkAnswer(
df = result,
expectedAnswer = Row(1, 30, Row(null, 10L))
@@ -753,7 +753,7 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
assert(
result.schema.fieldNames.toSeq ==
- Seq("id", "name", Scd1BatchProcessor.cdcMetadataColName)
+ Seq("id", "name", AutoCdcReservedNames.cdcMetadataColName)
)
checkAnswer(
df = result,
@@ -785,7 +785,7 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
// in which the user listed columns in IncludeColumns. The CDC metadata
column is appended
// last as always.
assert(result.schema.fieldNames.toSeq ==
- Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
+ Seq("id", "age", AutoCdcReservedNames.cdcMetadataColName))
checkAnswer(
df = result,
@@ -800,7 +800,7 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
// Even if a column is created with backticks via DDL, those backticks
are consumed by Spark
// before resolving the schema; they won't show up in the schema field.
.add("user.id", StringType)
- .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+ .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType)
val batch = microbatchOf(schema)(
Row(1, "u-100", Row(null, 10L))
@@ -826,7 +826,7 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
val result = processor.projectTargetColumnsOntoMicrobatch(batch)
assert(result.schema.fieldNames.toSeq ==
- Seq("id", "user.id", Scd1BatchProcessor.cdcMetadataColName))
+ Seq("id", "user.id", AutoCdcReservedNames.cdcMetadataColName))
checkAnswer(
df = result,
expectedAnswer = Row(1, "u-100", Row(null, 10L))
@@ -860,7 +860,7 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
// Output column names follow the microbatch schema's casing, not the
casing in the user's
// columnSelection. The CDC metadata column is appended last as always.
assert(result.schema.fieldNames.toSeq ==
- Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
+ Seq("id", "age", AutoCdcReservedNames.cdcMetadataColName))
checkAnswer(
df = result,
expectedAnswer = Row(1, 30, Row(null, 10L))
@@ -880,7 +880,7 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
// Data column.
.add("value", StringType)
// CDC metadata column.
- .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+ .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType)
/**
* Schema for the auxiliary input to
[[Scd1BatchProcessor.applyTombstonesToMicrobatch]] tests.
@@ -893,7 +893,7 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
// Key column.
.add("id", IntegerType)
// CDC metadata column.
- .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+ .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType)
test("applyTombstonesToMicrobatch drops late-arriving deletes and upserts
when a matching " +
"tombstone exists for the same key") {
@@ -1015,7 +1015,7 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
val schema = new StructType()
.add("region", StringType)
.add("customer_id", IntegerType)
- .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+ .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType)
val microbatch = microbatchOf(schema)(
Row("US", 1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))),
@@ -1051,7 +1051,7 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
test("applyTombstonesToMicrobatch supports backticked key names containing a
literal dot") {
val schema = new StructType()
.add("user.id", IntegerType)
- .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+ .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType)
val microbatch = microbatchOf(schema)(
Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5)))
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala
index 76790847ede5..bb8043e720c6 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala
@@ -41,12 +41,12 @@ class Scd1ForeachBatchHandlerSuite
private val auxiliarySchema = new StructType()
.add("id", IntegerType)
- .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+ .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
private val targetSchema = new StructType()
.add("id", IntegerType)
.add("value", StringType)
- .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+ .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
private val processor = Scd1BatchProcessor(
changeArgs = ChangeArgs(
@@ -155,11 +155,11 @@ class Scd1ForeachBatchHandlerSuite
val compositeAuxSchema = new StructType()
.add("country", StringType)
.add("city", StringType)
- .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+ .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
val compositeTargetSchema = new StructType()
.add("country", StringType)
.add("city", StringType)
- .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+ .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
val compositeProcessor = Scd1BatchProcessor(
changeArgs = ChangeArgs(
@@ -492,12 +492,12 @@ class Scd1ForeachBatchHandlerSuite
val compositeAuxSchema = new StructType()
.add("country", StringType)
.add("city", StringType)
- .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+ .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
val compositeTargetSchema = new StructType()
.add("country", StringType)
.add("city", StringType)
.add("population", LongType)
- .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+ .add(AutoCdcReservedNames.cdcMetadataColName, cdcMetadataColSchemaType())
val compositeProcessor = Scd1BatchProcessor(
changeArgs = ChangeArgs(
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessorSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessorSuite.scala
new file mode 100644
index 000000000000..f02986320ab6
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd2BatchProcessorSuite.scala
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{functions => F, QueryTest, Row}
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+class Scd2BatchProcessorSuite extends QueryTest with SharedSparkSession {
+
+ /** Build a microbatch [[DataFrame]] from explicit rows and an explicit
schema. */
+ private def microbatchOf(schema: StructType)(rows: Row*): DataFrame =
+ spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
+
+ // =============== preprocessMicrobatch tests ===============
+
+ test("preprocessMicrobatch appends framework columns __START_AT, __END_AT, "
+
+ "_cdc_metadata at the end of the schema in that order") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("value", StringType)
+
+ val batch = microbatchOf(schema)(Row(1, 10L, "a"))
+
+ val processor = Scd2BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type2
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.preprocessMicrobatch(batch)
+
+ assert(result.schema.fieldNames.toSeq == Seq(
+ "id", "seq", "value",
+ Scd2BatchProcessor.startAtColName,
+ Scd2BatchProcessor.endAtColName,
+ AutoCdcReservedNames.cdcMetadataColName
+ ))
+ }
+
+ test("preprocessMicrobatch returns an empty DataFrame with the full
preprocessed schema") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("value", StringType)
+
+ val batch = microbatchOf(schema)()
+
+ val processor = Scd2BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type2
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.preprocessMicrobatch(batch)
+
+ assert(result.collect().isEmpty)
+ assert(result.schema.fieldNames.toSeq == Seq(
+ "id", "seq", "value",
+ Scd2BatchProcessor.startAtColName,
+ Scd2BatchProcessor.endAtColName,
+ AutoCdcReservedNames.cdcMetadataColName
+ ))
+ }
+
+ test("preprocessMicrobatch stamps __START_AT, __END_AT, and
__RECORD_START_AT correctly " +
+ "across delete and upsert events for the same key") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("value", StringType)
+ .add("is_delete", BooleanType)
+
+ // All three events target the same key. SCD2 must preserve every event in
the output -
+ // unlike SCD1, no per-key deduplication is performed; this also
implicitly pins the
+ // no-dedup contract of preprocessMicrobatch.
+ val batch = microbatchOf(schema)(
+ Row(1, 10L, "first-upsert", false),
+ Row(1, 20L, "second-upsert", false),
+ Row(1, 30L, null, true)
+ )
+
+ val processor = Scd2BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type2,
+ deleteCondition = Some(F.col("is_delete"))
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ // Per-row contract for the framework columns:
+ // - __START_AT = sequencing for every row (the active-from time)
+ // - __END_AT = sequencing for delete rows; null for upserts
(mutual exclusion)
+ // - __RECORD_START_AT = sequencing for every row, regardless of delete
vs upsert
+ // (lineage preserved into the merge step)
+ checkAnswer(
+ df = processor.preprocessMicrobatch(batch),
+ expectedAnswer = Seq(
+ Row(1, 10L, "first-upsert", false, 10L, null, Row(10L)),
+ Row(1, 20L, "second-upsert", false, 20L, null, Row(20L)),
+ Row(1, 30L, null, true, 30L, 30L, Row(30L))
+ )
+ )
+ }
+
+ test("preprocessMicrobatch preserves byte-identical full-event duplicates") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("value", StringType)
+ .add("is_delete", BooleanType)
+
+ // Two byte-identical events for the same key: same key, same sequencing,
same data, same
+ // delete flag. SCD2 preprocessing intentionally preserves every event
verbatim, including
+ // full-event duplicates. Cross-event redundancy elimination (collapsing
duplicates before
+ // they could reconcile to a zero-width visible row) is the responsibility
of downstream
+ // reconciliation, not preprocessing.
+ val batch = microbatchOf(schema)(
+ Row(1, 10L, "alice", false),
+ Row(1, 10L, "alice", false)
+ )
+
+ val processor = Scd2BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type2,
+ deleteCondition = Some(F.col("is_delete"))
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ // Both rows must survive verbatim.
+ checkAnswer(
+ df = processor.preprocessMicrobatch(batch),
+ expectedAnswer = Seq(
+ Row(1, 10L, "alice", false, 10L, null, Row(10L)),
+ Row(1, 10L, "alice", false, 10L, null, Row(10L))
+ )
+ )
+ }
+
+ test("preprocessMicrobatch leaves __END_AT null on every row when
deleteCondition is None") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("value", StringType)
+
+ val batch = microbatchOf(schema)(
+ Row(1, 10L, "a"),
+ Row(2, 20L, "b")
+ )
+
+ val processor = Scd2BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type2,
+ deleteCondition = None
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ checkAnswer(
+ df = processor.preprocessMicrobatch(batch).select(
+ F.col(Scd2BatchProcessor.endAtColName)
+ ),
+ expectedAnswer = Seq(Row(null), Row(null))
+ )
+ }
+
+ test("preprocessMicrobatch treats null deleteCondition results as upsert " +
+ "(__END_AT stays null)") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("is_delete", BooleanType)
+
+ val batch = microbatchOf(schema)(
+ // is_delete is null - the delete condition evaluates to null, which
Spark treats as the
+ // otherwise branch, so the row is classified as an upsert.
+ Row(1, 10L, null)
+ )
+
+ val processor = Scd2BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type2,
+ deleteCondition = Some(F.col("is_delete"))
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ checkAnswer(
+ df = processor.preprocessMicrobatch(batch).select(
+ F.col(Scd2BatchProcessor.endAtColName)
+ ),
+ expectedAnswer = Row(null)
+ )
+ }
+
+ test("preprocessMicrobatch evaluates an arbitrary sequencing expression
per-row") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("alt_seq", LongType)
+ .add("value", StringType)
+
+ // Sequencing is a function call referencing multiple columns, not a bare
identifier. Locks
+ // in that the framework columns evaluate the full expression per-row
rather than treating
+ // `sequencing` as a single column reference.
+ val batch = microbatchOf(schema)(
+ // greatest(10, 30) = 30
+ Row(1, 10L, 30L, "row1"),
+ // greatest(40, 20) = 40
+ Row(2, 40L, 20L, "row2")
+ )
+
+ val processor = Scd2BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.greatest(F.col("seq"), F.col("alt_seq")),
+ storedAsScdType = ScdType.Type2
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.preprocessMicrobatch(batch)
+
+ checkAnswer(
+ df = result.select(
+ F.col(Scd2BatchProcessor.startAtColName),
+ F.col(s"${AutoCdcReservedNames.cdcMetadataColName}." +
+ s"${Scd2BatchProcessor.recordStartAtFieldName}")
+ ),
+ expectedAnswer = Seq(
+ Row(30L, 30L),
+ Row(40L, 40L)
+ )
+ )
+ }
+
+ /** Schema reused by columnSelection tests: id (key), name, age, seq
(sequencing). */
+ private val multiUserColSchema: StructType = new StructType()
+ .add("id", IntegerType)
+ .add("name", StringType)
+ .add("age", IntegerType)
+ .add("seq", LongType)
+
+ test("preprocessMicrobatch keeps every user column when columnSelection is
None") {
+ val batch = microbatchOf(multiUserColSchema)(
+ Row(1, "alice", 30, 10L)
+ )
+
+ val processor = Scd2BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type2,
+ columnSelection = None
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.preprocessMicrobatch(batch)
+
+ assert(result.schema.fieldNames.toSeq == Seq(
+ "id", "name", "age", "seq",
+ Scd2BatchProcessor.startAtColName,
+ Scd2BatchProcessor.endAtColName,
+ AutoCdcReservedNames.cdcMetadataColName
+ ))
+ }
+
+ test("preprocessMicrobatch retains framework columns even when
IncludeColumns omits them") {
+ val batch = microbatchOf(multiUserColSchema)(
+ Row(1, "alice", 30, 10L)
+ )
+
+ val processor = Scd2BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type2,
+ columnSelection = Some(ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("age"))
+ ))
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.preprocessMicrobatch(batch)
+
+ assert(result.schema.fieldNames.toSeq == Seq(
+ "id", "age",
+ Scd2BatchProcessor.startAtColName,
+ Scd2BatchProcessor.endAtColName,
+ AutoCdcReservedNames.cdcMetadataColName
+ ))
+ checkAnswer(
+ df = result,
+ expectedAnswer = Row(1, 30, 10L, null, Row(10L))
+ )
+ }
+
+ test("preprocessMicrobatch drops user columns listed in ExcludeColumns; " +
+ "framework columns survive") {
+ val batch = microbatchOf(multiUserColSchema)(
+ Row(1, "alice", 30, 10L)
+ )
+
+ val processor = Scd2BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type2,
+ columnSelection = Some(ColumnSelection.ExcludeColumns(
+ Seq(UnqualifiedColumnName("name"))
+ ))
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.preprocessMicrobatch(batch)
+
+ assert(result.schema.fieldNames.toSeq == Seq(
+ "id", "age", "seq",
+ Scd2BatchProcessor.startAtColName,
+ Scd2BatchProcessor.endAtColName,
+ AutoCdcReservedNames.cdcMetadataColName
+ ))
+ checkAnswer(
+ df = result,
+ expectedAnswer = Row(1, 30, 10L, 10L, null, Row(10L))
+ )
+ }
+
+ test("preprocessMicrobatch preserves the microbatch schema's user-column
order, " +
+ "ignoring the order of IncludeColumns") {
+ val batch = microbatchOf(multiUserColSchema)(
+ Row(1, "alice", 30, 10L)
+ )
+
+ val processor = Scd2BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type2,
+ // User specifies (age, id) - intentionally different from the schema
order (id, age).
+ columnSelection = Some(ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("id"))
+ ))
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.preprocessMicrobatch(batch)
+
+ // Output column order follows the microbatch schema (id before age), not
the user's listing
+ // order in IncludeColumns. Framework columns are always appended last.
+ assert(result.schema.fieldNames.toSeq == Seq(
+ "id", "age",
+ Scd2BatchProcessor.startAtColName,
+ Scd2BatchProcessor.endAtColName,
+ AutoCdcReservedNames.cdcMetadataColName
+ ))
+ }
+
+ test("preprocessMicrobatch resolves columnSelection case-insensitively " +
+ "when SQLConf.CASE_SENSITIVE=false") {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ val batch = microbatchOf(multiUserColSchema)(
+ Row(1, "alice", 30, 10L)
+ )
+
+ val processor = Scd2BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type2,
+ // User columns intentionally use a different case than the schema
(id, age).
+ columnSelection = Some(ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("ID"), UnqualifiedColumnName("AGE"))
+ ))
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.preprocessMicrobatch(batch)
+
+ // Output column names follow the microbatch schema's casing, not the
user's casing.
+ assert(result.schema.fieldNames.toSeq == Seq(
+ "id", "age",
+ Scd2BatchProcessor.startAtColName,
+ Scd2BatchProcessor.endAtColName,
+ AutoCdcReservedNames.cdcMetadataColName
+ ))
+ }
+ }
+
+ test("preprocessMicrobatch handles backticked column names containing a
literal dot") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ // Even if a column is created with backticks via DDL, those backticks
are consumed by Spark
+ // before resolving the schema; they won't show up in the schema field.
+ .add("user.id", StringType)
+ .add("seq", LongType)
+
+ val batch = microbatchOf(schema)(
+ Row(1, "u-100", 10L)
+ )
+
+ val processor = Scd2BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type2,
+ columnSelection = Some(ColumnSelection.IncludeColumns(
+ Seq(
+ UnqualifiedColumnName("id"),
+ UnqualifiedColumnName("`user.id`")
+ )
+ ))
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.preprocessMicrobatch(batch)
+
+ assert(result.schema.fieldNames.toSeq == Seq(
+ "id", "user.id",
+ Scd2BatchProcessor.startAtColName,
+ Scd2BatchProcessor.endAtColName,
+ AutoCdcReservedNames.cdcMetadataColName
+ ))
+ checkAnswer(
+ df = result,
+ expectedAnswer = Row(1, "u-100", 10L, null, Row(10L))
+ )
+ }
+
+ test("preprocessMicrobatch correctly populates framework columns even when
ExcludeColumns " +
+ "drops the columns referenced by sequencing and deleteCondition") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("value", StringType)
+ // Both seq and is_delete are referenced by the flow's sequencing /
deleteCondition
+ // expressions, but the user wants them excluded from the target table.
+ .add("seq", LongType)
+ .add("is_delete", BooleanType)
+
+ val batch = microbatchOf(schema)(
+ Row(1, "alice", 10L, false),
+ Row(1, null, 20L, true)
+ )
+
+ val processor = Scd2BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type2,
+ deleteCondition = Some(F.col("is_delete")),
+ columnSelection = Some(ColumnSelection.ExcludeColumns(
+ Seq(UnqualifiedColumnName("seq"), UnqualifiedColumnName("is_delete"))
+ ))
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ // The orchestrator runs row-extension steps before column selection, so
the framework
+ // columns reference seq / is_delete fully even though the final
projection drops them.
+ val result = processor.preprocessMicrobatch(batch)
+
+ assert(result.schema.fieldNames.toSeq == Seq(
+ "id", "value",
+ Scd2BatchProcessor.startAtColName,
+ Scd2BatchProcessor.endAtColName,
+ AutoCdcReservedNames.cdcMetadataColName
+ ))
+ checkAnswer(
+ df = result,
+ expectedAnswer = Seq(
+ Row(1, "alice", 10L, null, Row(10L)),
+ Row(1, null, 20L, 20L, Row(20L))
+ )
+ )
+ }
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
index 302ff789c12d..9793605e884d 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcGraphExecutionTestMixin.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.classic.DataFrame
import
org.apache.spark.sql.connector.catalog.SharedTablesInMemoryRowLevelOperationTableCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.pipelines.autocdc.{
+ AutoCdcReservedNames,
ChangeArgs,
ColumnSelection,
Scd1BatchProcessor,
@@ -146,7 +147,7 @@ trait AutoCdcGraphExecutionTestMixin extends
BeforeAndAfterEach {
* Assumes sequence type is BIGINT (Long).
*/
protected val cdcMetadataDdl: String = {
- val col = Scd1BatchProcessor.cdcMetadataColName
+ val col = AutoCdcReservedNames.cdcMetadataColName
val del = Scd1BatchProcessor.cdcDeleteSequenceFieldName
val ups = Scd1BatchProcessor.cdcUpsertSequenceFieldName
s"$col STRUCT<$del:BIGINT,$ups:BIGINT> NOT NULL"
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
index 3453235cbae8..bbd8f7c8dbf9 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1AuxiliaryTableDurabilitySuite.scala
@@ -21,8 +21,8 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
import org.apache.spark.sql.functions
import org.apache.spark.sql.pipelines.autocdc.{
+ AutoCdcReservedNames,
ColumnSelection,
- Scd1BatchProcessor,
UnqualifiedColumnName
}
import org.apache.spark.sql.pipelines.utils.{ExecutionTest,
TestGraphRegistrationContext}
@@ -144,7 +144,7 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
// The auxiliary table only contains keys and the metadata column, hence
"name" should not be
// included.
- assert(auxSchema.fieldNames.toSeq == Seq("id",
Scd1BatchProcessor.cdcMetadataColName))
+ assert(auxSchema.fieldNames.toSeq == Seq("id",
AutoCdcReservedNames.cdcMetadataColName))
assert(getAuxTableKeyColumnNames(target = "target") == Seq("id"))
}
@@ -177,7 +177,7 @@ class AutoCdcScd1AuxiliaryTableDurabilitySuite
val auxSchema = spark.table(auxTableNameFor("target")).schema
assert(auxSchema.fieldNames.toSeq ==
- Seq("region", "id", Scd1BatchProcessor.cdcMetadataColName))
+ Seq("region", "id", AutoCdcReservedNames.cdcMetadataColName))
assert(getAuxTableKeyColumnNames(target = "target") == Seq("region", "id"))
}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala
index 46f8ee47db02..a5f3a13a012a 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/AutoCdcScd1TargetTableDurabilitySuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.pipelines.graph
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
import org.apache.spark.sql.functions
-import org.apache.spark.sql.pipelines.autocdc.Scd1BatchProcessor
+import org.apache.spark.sql.pipelines.autocdc.AutoCdcReservedNames
import org.apache.spark.sql.pipelines.utils.{ExecutionTest,
TestGraphRegistrationContext}
import org.apache.spark.sql.test.SharedSparkSession
@@ -147,8 +147,8 @@ class AutoCdcScd1TargetTableDurabilitySuite
val schema = spark.table(s"$catalog.$namespace.target").schema
assert(
- schema.fieldNames.contains(Scd1BatchProcessor.cdcMetadataColName),
- s"Target must have ${Scd1BatchProcessor.cdcMetadataColName} after first
AutoCDC run; " +
+ schema.fieldNames.contains(AutoCdcReservedNames.cdcMetadataColName),
+ s"Target must have ${AutoCdcReservedNames.cdcMetadataColName} after
first AutoCDC run; " +
s"got ${schema.fieldNames.toSeq}"
)
checkAnswer(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]