[ 
https://issues.apache.org/jira/browse/SPARK-56870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-56870:
-----------------------------------
    Labels: pull-request-available  (was: )

> Implement SCD1 Batch Processor; Extend Microbatch with CDC Metadata
> -------------------------------------------------------------------
>
>                 Key: SPARK-56870
>                 URL: https://issues.apache.org/jira/browse/SPARK-56870
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Declarative Pipelines
>    Affects Versions: 4.2.0, 4.3.0
>            Reporter: Anish Mahto
>            Priority: Major
>              Labels: pull-request-available
>
> {*}Preamble{*}:
> The SCD type 1 flow is a foreachBatch streaming query on an input 
> change-data-feed, and is responsible for reconciling the incoming change data 
> onto some target table that follows SCD1 replication semantics.
> SCD1 flows also maintain an "auxiliary" table to keep track of early-arriving 
> out-of-order received events state. Each microbatch will need to reconcile 
> against this auxiliary table as well, and update the auxiliary table's state 
> appropriately for future microbatches.
>  
> {*}Extend Microbatch with CDC Metadata{*}:
> After deduplication, all of the incoming rows can be classified as either a 
> delete event or an upsert event (mutually exclusive), and there's at most one 
> per key.
> If we identify a row as a delete event, remember its sequencing as its 
> `deleteVersion`. If we identify a row as an upsert event, remember its 
> sequencing as its `upsertVersion`. That is, `deleteVersion`/`upsertVersion` 
> encode both the sequencing for the row as well as the row classification 
> (delete or upsert).
> We need to persist this encoded information now, because in future stages we 
> may drop the columns that `deleteCondition` needed to do the classification 
> in the first place, depending on which columns were selected by 
> `ChangeArgs.columnSelection`.
>  
> *Where is the CDC Metadata stored?*
> Within the microbatch, we append a `_cdc_metadata` struct column, that stores 
> the `deleteVersion` and `upsertVersion`.
> This `_cdc_metadata` column will eventually also land in the persisted target 
> and auxiliary tables, which are the artifacts of an AutoCDC flow. This column 
> represents operational metadata that the AutoCDC flow has tagged a row with, 
> and is necessary for out-of-order correctness of the SCD decomposition. 
> Users will not be able to opt out of persisting this column in the target 
> table using `ChangeArgs.columnSelection`, as it is necessary for correctness. 
> The column will not have a stable public contract, and users should make no 
> assumptions on its contents.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to