This is an automated email from the ASF dual-hosted git repository.
dbtsai pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new d454aee40231 [SPARK-56249][SDP] Implement SCD1 Batch Processor; Merge
Tombstones onto Microbatch
d454aee40231 is described below
commit d454aee40231e709bcd1bbe20490e417dfe08b39
Author: AnishMahto <[email protected]>
AuthorDate: Fri May 22 16:55:02 2026 -0700
[SPARK-56249][SDP] Implement SCD1 Batch Processor; Merge Tombstones onto
Microbatch
Approved AutoCDC SPIP:
https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
--------
**Preamble:**
The SCD type 1 flow is a foreachBatch streaming query on an input
change-data-feed, and is responsible for reconciling the incoming change data
onto some target table that follows SCD1 replication semantics.
SCD1 flows also maintain an "auxiliary" table to keep track of
early-arriving out-of-order received events state. Each microbatch will need to
reconcile against this auxiliary table as well, and update the auxiliary
table's state appropriately for future microbatches.
**Merge Tombstones onto Microbatch:**
The auxiliary table produced by an SCD1 flow will [strictly] store
tombstones accumulated from the flow's change data feed source thus far.
In SCD1, a tombstone is defined as a delete event that has not been
overtaken by any upsert event so far (an upsert event whose sequence is geq to
the delete event's sequence).
These events/rows are called tombstones because they represent delete
events that could still be relevant in closing a late-arriving upsert received
in future microbatches. But we cannot store this type of row in the target
table, as it would break the contract of what rows an SCD1 compliant replica
table contains - hence these tombstones are stored in the auxiliary table.
When a new microbatch is processed, its possible it contains said
late-arriving upsert events that should be swallowed by some known
tombstone(s). We need to left anti join the incoming microbatch with the
auxiliary table on tombstones that do indeed match to the microbatch's
late-arriving upserts.
Closes #55993 from AnishMahto/SPARK-56249-merge-tombstones-onto-microbatch.
Authored-by: AnishMahto <[email protected]>
Signed-off-by: DB Tsai <[email protected]>
(cherry picked from commit 2a78a09069cc3cb43220b0dda72d9d1146d302b0)
Signed-off-by: DB Tsai <[email protected]>
---
.../sql/pipelines/autocdc/Scd1BatchProcessor.scala | 54 ++++
.../autocdc/Scd1BatchProcessorSuite.scala | 351 ++++++++++++++++++++-
2 files changed, 400 insertions(+), 5 deletions(-)
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index 03aaf284f070..e80d43b11554 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -166,6 +166,52 @@ case class Scd1BatchProcessor(
)
}
+ /**
+ * Left anti-join the microbatch with the auxiliary table on tombstones that
match against and
+ * effectively delete late-arriving upserts (or stale deletes).
+ *
+ * @param microbatchDf The incoming microbatch dataframe with at minimum all
of the key
+ * columns + CDC metadata column.
+ * @param auxiliaryTableDf Dataframe representing the auxiliary table, with
at minimum the key
+ * columns + CDC metadata column.
+ *
+ * The returned filtered dataframe has the same schema as the input
microbatch, but with only
+ * the rows that remain unaffected by any known tombstones.
+ */
+ def applyTombstonesToMicrobatch(
+ microbatchDf: DataFrame,
+ auxiliaryTableDf: DataFrame): DataFrame = {
+ val aliasedMicrobatchDf = microbatchDf.alias("microbatch")
+ val aliasedAuxiliaryTableDf = auxiliaryTableDf.alias("auxiliaryTable")
+
+ val cdcMetadata = Scd1BatchProcessor.cdcMetadataColName
+
+ val microbatchCdcMetadata = F.col(s"microbatch.$cdcMetadata")
+ val effectiveSeq = F.greatest(
+ Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadata),
+ Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadata)
+ )
+ val tombstoneDeleteSeq =
+
Scd1BatchProcessor.deleteSequenceOf(F.col(s"auxiliaryTable.$cdcMetadata"))
+
+ val keysMatch = changeArgs.keys
+ .map { k =>
+ F.col(s"microbatch.${k.quoted}") ===
F.col(s"auxiliaryTable.${k.quoted}")
+ }
+ .reduce(_ && _)
+
+ // A microbatch row is considered late-arriving (and therefore deleted by
the tombstone) when
+ // the auxiliary table holds a tombstone for the same key with a strictly
larger delete
+ // sequence. Both late-arriving upserts and deletes are dropped.
+ val microbatchRowDeletedByTombstone = effectiveSeq < tombstoneDeleteSeq
+
+ aliasedMicrobatchDf.join(
+ right = aliasedAuxiliaryTableDf,
+ joinExprs = keysMatch && microbatchRowDeletedByTombstone,
+ joinType = "left_anti"
+ )
+ }
+
private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit
= {
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
val resolver = microbatchSqlConf.resolver
@@ -194,6 +240,14 @@ object Scd1BatchProcessor {
private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence"
private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence"
+ /** Project the delete sequence out of the CDC metadata column. */
+ private[autocdc] def deleteSequenceOf(cdcMetadataCol: Column): Column =
+ cdcMetadataCol.getField(cdcDeleteSequenceFieldName)
+
+ /** Project the upsert sequence out of the CDC metadata column. */
+ private[autocdc] def upsertSequenceOf(cdcMetadataCol: Column): Column =
+ cdcMetadataCol.getField(cdcUpsertSequenceFieldName)
+
/**
* Schema of the CDC metadata struct column for SCD1.
*/
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
index a49c89e35755..c78dc123621b 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
@@ -41,6 +41,18 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
.add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
)
+ /** DataType for the CDC metadata column, where sequencing type is Long. */
+ private val cdcMetadataColSchemaType: DataType = new StructType()
+ .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
+ .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
+
+ /**
+ * Helper to construct a CDC metadata column row, following
[[cdcMetadataColSchemaType]].
+ */
+ private def cdcMetadataRow(deleteSeq: Option[Long], upsertSeq:
Option[Long]): Row =
+ Row(deleteSeq.getOrElse(null), upsertSeq.getOrElse(null))
+
+
/** Build a microbatch [[DataFrame]] from explicit rows and an explicit
schema. */
private def microbatchOf(schema: StructType)(rows: Row*): DataFrame =
spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
@@ -53,6 +65,8 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
private def columnNamesAndDataTypes(schema: StructType): Seq[(String,
DataType)] =
schema.fields.map(f => (f.name, f.dataType)).toSeq
+ // =============== deduplicateMicrobatch tests ===============
+
test("deduplicateMicrobatch keeps only the row with the largest sequence
value per key") {
val schema = new StructType()
.add("id", IntegerType)
@@ -463,6 +477,8 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
assert(columnNamesAndDataTypes(result.schema) ==
columnNamesAndDataTypes(schema))
}
+ // =============== extendMicrobatchRowsWithCdcMetadata tests ===============
+
test("extendMicrobatchRowsWithCdcMetadata classifies each row as a delete or
an upsert " +
"per deleteCondition") {
val schema = new StructType()
@@ -861,11 +877,7 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
// Even if a column is created with backticks via DDL, those backticks
are consumed by Spark
// before resolving the schema; they won't show up in the schema field.
.add("user.id", StringType)
- .add(
- Scd1BatchProcessor.cdcMetadataColName,
- new StructType()
- .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
- .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType))
+ .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
val batch = microbatchOf(schema)(
Row(1, "u-100", Row(null, 10L))
@@ -932,4 +944,333 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
)
}
}
+
+ // =============== applyTombstonesToMicrobatch tests ===============
+
+ /**
+ * Schema for the microbatch input to
[[Scd1BatchProcessor.applyTombstonesToMicrobatch]]
+ * tests.
+ */
+ private val applyTombstonesToMicrobatchTestMicrobatchSchema: StructType =
new StructType()
+ // Key column.
+ .add("id", IntegerType)
+ // Data column.
+ .add("value", StringType)
+ // CDC metadata column.
+ .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+
+ /**
+ * Schema for the auxiliary input to
[[Scd1BatchProcessor.applyTombstonesToMicrobatch]] tests.
+ *
+ * In practice for SCD1 the auxiliary table only carries key columns and the
CDC metadata
+ * column -- never user data columns -- so we mirror that production-side
asymmetry here,
+ * even though the function's API contract would allow a single shared
schema.
+ */
+ private val applyTombstonesToMicrobatchTestAuxiliarySchema: StructType = new
StructType()
+ // Key column.
+ .add("id", IntegerType)
+ // CDC metadata column.
+ .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+
+ test("applyTombstonesToMicrobatch drops late-arriving deletes and upserts
when a matching " +
+ "tombstone exists for the same key") {
+ // Both microbatch events have an effective sequence strictly less than
the tombstone's
+ // delete sequence, so they must be anti-joined out of the microbatch
regardless of whether
+ // they are deletes or upserts.
+ val microbatch =
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+ Row(1, "stale-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(5))),
+ Row(1, "stale-delete", cdcMetadataRow(deleteSeq = Some(7), upsertSeq =
None))
+ )
+ val auxiliary =
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)(
+ Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is
already encoded
+ // into the CDC metadata column.
+ sequencing = F.lit(0L),
+ storedAsScdType = ScdType.Type1
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.applyTombstonesToMicrobatch(microbatch, auxiliary)
+ assert(result.collect().isEmpty)
+ }
+
+ test("applyTombstonesToMicrobatch keeps a microbatch row whose effective
sequence ties the " +
+ "tombstone's delete sequence") {
+ // The join uses strict `<`, so a microbatch row with the same effective
sequence as the
+ // tombstone is kept. This is an internal tie-breaking convention for SCD1
only, and is
+ // *not* a publicly documented contract: if external callers ever start
relying on it, both
+ // this test and the join condition in applyTombstonesToMicrobatch should
move together.
+ val microbatch =
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+ Row(1, "tied-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(10)))
+ )
+ val auxiliary =
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)(
+ Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is
already encoded
+ // into the CDC metadata column.
+ sequencing = F.lit(0L),
+ storedAsScdType = ScdType.Type1
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ checkAnswer(
+ df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary),
+ expectedAnswer = Row(1, "tied-upsert", Row(null, 10L))
+ )
+ }
+
+ test("applyTombstonesToMicrobatch keeps microbatch rows whose effective
sequence exceeds the " +
+ "tombstone's delete sequence") {
+ val microbatch =
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+ Row(1, "fresher-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(15))),
+ Row(1, "fresher-delete", cdcMetadataRow(deleteSeq = Some(20), upsertSeq
= None))
+ )
+ val auxiliary =
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)(
+ Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is
already encoded
+ // into the CDC metadata column.
+ sequencing = F.lit(0L),
+ storedAsScdType = ScdType.Type1
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ checkAnswer(
+ df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary),
+ expectedAnswer = Seq(
+ Row(1, "fresher-upsert", Row(null, 15L)),
+ Row(1, "fresher-delete", Row(20L, null))
+ )
+ )
+ }
+
+ test("applyTombstonesToMicrobatch leaves microbatch rows untouched when the
tombstone targets " +
+ "a different key") {
+ val microbatch =
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+ Row(1, "stays", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5)))
+ )
+ // Tombstone on a different key with a much larger sequence; the key match
must guard
+ // against cross-key application no matter how stale the microbatch row's
sequence is.
+ val auxiliary =
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)(
+ Row(2, cdcMetadataRow(deleteSeq = Some(1000), upsertSeq = None))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is
already encoded
+ // into the CDC metadata column.
+ sequencing = F.lit(0L),
+ storedAsScdType = ScdType.Type1
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ checkAnswer(
+ df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary),
+ expectedAnswer = Row(1, "stays", Row(null, 5L))
+ )
+ }
+
+ test("applyTombstonesToMicrobatch with composite keys requires every key
column to match") {
+ val schema = new StructType()
+ .add("region", StringType)
+ .add("customer_id", IntegerType)
+ .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+
+ val microbatch = microbatchOf(schema)(
+ Row("US", 1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))),
+ Row("US", 2, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5)))
+ )
+ // Tombstone matches on `region` only; `customer_id` differs from every
microbatch row.
+ // The join condition is the AND of all key column equalities, so neither
microbatch row
+ // should be dropped.
+ val auxiliary = microbatchOf(schema)(
+ Row("US", 99, cdcMetadataRow(deleteSeq = Some(1000), upsertSeq = None))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("region"),
UnqualifiedColumnName("customer_id")),
+ // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is
already encoded
+ // into the CDC metadata column.
+ sequencing = F.lit(0L),
+ storedAsScdType = ScdType.Type1
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ checkAnswer(
+ df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary),
+ expectedAnswer = Seq(
+ Row("US", 1, Row(null, 5L)),
+ Row("US", 2, Row(null, 5L))
+ )
+ )
+ }
+
+ test("applyTombstonesToMicrobatch supports backticked key names containing a
literal dot") {
+ val schema = new StructType()
+ .add("user.id", IntegerType)
+ .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+
+ val microbatch = microbatchOf(schema)(
+ Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5)))
+ )
+ val auxiliary = microbatchOf(schema)(
+ Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("`user.id`")),
+ // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is
already encoded
+ // into the CDC metadata column.
+ sequencing = F.lit(0L),
+ storedAsScdType = ScdType.Type1
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.applyTombstonesToMicrobatch(microbatch, auxiliary)
+ assert(result.collect().isEmpty)
+ }
+
+ test("applyTombstonesToMicrobatch is a no-op when the auxiliary table is
empty") {
+ val microbatch =
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+ Row(1, "kept-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(5))),
+ Row(2, "kept-delete", cdcMetadataRow(deleteSeq = Some(7), upsertSeq =
None))
+ )
+
+ // Empty auxiliary: no rows means the left-anti join cannot match any
microbatch row, so the
+ // microbatch passes through untouched regardless of its contents.
+
+ // Conceptually, this means there are no tombstones that could potentially
have delete-matched
+ // against incoming rows in the microbatch.
+ val auxiliary =
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)()
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is
already encoded
+ // into the CDC metadata column.
+ sequencing = F.lit(0L),
+ storedAsScdType = ScdType.Type1
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ checkAnswer(
+ df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary),
+ expectedAnswer = Seq(
+ Row(1, "kept-upsert", Row(null, 5L)),
+ Row(2, "kept-delete", Row(7L, null))
+ )
+ )
+ }
+
+ test("applyTombstonesToMicrobatch keeps microbatch rows when the matching
aux row has a " +
+ "null deleteSequence") {
+ // SCD1's tombstone-merge invariant guarantees aux rows always have a
non-null
+ // deleteSequence, but if a corrupt aux row ever does carry a null
deleteSequence, the
+ // join's `<` predicate evaluates to null (SQL 3-valued logic) and the
microbatch row is
+ // retained -- a safe fallback that never silently drops data.
+ val microbatch =
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+ Row(1, "kept-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(5)))
+ )
+ val auxiliary =
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)(
+ Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = None))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is
already encoded
+ // into the CDC metadata column.
+ sequencing = F.lit(0L),
+ storedAsScdType = ScdType.Type1
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ checkAnswer(
+ df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary),
+ expectedAnswer = Row(1, "kept-upsert", Row(null, 5L))
+ )
+ }
+
+ test("applyTombstonesToMicrobatch is unaffected by stale tombstones in
auxiliary table") {
+ // SCD1's tombstone-merge invariant guarantees at most one tombstone per
key in the
+ // auxiliary, but if multiple ever coexist for the same key, the left-anti
semantics drop
+ // the microbatch row whenever *any* matching tombstone has a strictly
greater
+ // deleteSequence.
+ val microbatch =
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+ Row(1, "stale-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(8)))
+ )
+ // Two tombstones on key=1: one stale (deleteSeq=5, doesn't dominate the
microbatch row's
+ // effective seq of 8), one fresh (deleteSeq=10, dominates). The fresh one
alone is enough
+ // to drop the microbatch row.
+ val auxiliary =
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)(
+ Row(1, cdcMetadataRow(deleteSeq = Some(5), upsertSeq = None)),
+ Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is
already encoded
+ // into the CDC metadata column.
+ sequencing = F.lit(0L),
+ storedAsScdType = ScdType.Type1
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.applyTombstonesToMicrobatch(microbatch, auxiliary)
+ assert(result.collect().isEmpty)
+ }
+
+ test("applyTombstonesToMicrobatch ignores the aux row's upsertSequence even
when it is set") {
+ // SCD1's tombstone-merge invariant guarantees aux rows always have a null
upsertSequence
+ // (by definition, an aux row is an unswallowed tombstone). But if a
corrupt aux row ever
+ // has both fields set, only its deleteSequence is read by the join
condition; the
+ // upsertSequence is never inspected, so the row continues to behave as a
pure tombstone.
+ val microbatch =
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+ Row(1, "stale-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(5)))
+ )
+ // Aux row with both fields populated; only deleteSeq=10 drives the
tombstone-drop decision.
+ val auxiliary =
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)(
+ Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = Some(20)))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is
already encoded
+ // into the CDC metadata column.
+ sequencing = F.lit(0L),
+ storedAsScdType = ScdType.Type1
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.applyTombstonesToMicrobatch(microbatch, auxiliary)
+ assert(result.collect().isEmpty)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]