[
https://issues.apache.org/jira/browse/SPARK-56870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-56870:
-----------------------------------
Labels: pull-request-available (was: )
> Implement SCD1 Batch Processor; Extend Microbatch with CDC Metadata
> -------------------------------------------------------------------
>
> Key: SPARK-56870
> URL: https://issues.apache.org/jira/browse/SPARK-56870
> Project: Spark
> Issue Type: Sub-task
> Components: Declarative Pipelines
> Affects Versions: 4.2.0, 4.3.0
> Reporter: Anish Mahto
> Priority: Major
> Labels: pull-request-available
>
> {*}Preamble{*}:
> The SCD type 1 flow is a foreachBatch streaming query on an input
> change-data-feed, and is responsible for reconciling the incoming change data
> onto some target table that follows SCD1 replication semantics.
> SCD1 flows also maintain an "auxiliary" table to keep track of early-arriving
> out-of-order received events state. Each microbatch will need to reconcile
> against this auxiliary table as well, and update the auxiliary table's state
> appropriately for future microbatches.
>
> {*}Extend Microbatch with CDC Metadata{*}:
> After deduplication, all of the incoming rows can be classified as either a
> delete event or an upsert event (mutually exclusive), and there's at most one
> per key.
> If we identify a row as a delete event, remember its sequencing as its
> `deleteVersion`. If we identify a row as an upsert event, remember its
> sequencing as its `upsertVersion`. That is, `deleteVersion`/`upsertVersion`
> encode both the sequencing for the row as well as the row classification
> (delete or upsert).
> We need to persist this encoded information now, because in future stages we
> may drop the columns that `deleteCondition` needed to do the classification
> in the first place, depending on which columns were selected by
> `ChangeArgs.columnSelection`.
>
> *Where is the CDC Metadata stored?*
> Within the microbatch, we append a `_cdc_metadata` struct column, that stores
> the `deleteVersion` and `upsertVersion`.
> This `_cdc_metadata` column will eventually also land in the persisted target
> and auxiliary tables, which are the artifacts of an AutoCDC flow. This column
> represents operational metadata that the AutoCDC flow has tagged a row with,
> and is necessary for out-of-order correctness of the SCD decomposition.
> Users will not be able to opt out of persisting this column in the target
> table using `ChangeArgs.columnSelection`, as it is necessary for correctness.
> The column will not have a stable public contract, and users should make no
> assumptions on its contents.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]