[ 
https://issues.apache.org/jira/browse/SPARK-57152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Torres reassigned SPARK-57152:
-----------------------------------

    Assignee: Anish Mahto

> Implement SCD2 Batch Processor; Find Affected Aux/Target Table Rows
> -------------------------------------------------------------------
>
>                 Key: SPARK-57152
>                 URL: https://issues.apache.org/jira/browse/SPARK-57152
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Declarative Pipelines
>    Affects Versions: 4.3.0
>            Reporter: Anish Mahto
>            Assignee: Anish Mahto
>            Priority: Major
>              Labels: pull-request-available
>
> {*}Preamble{*}:
> The SCD type 2 flow is a foreachBatch streaming query on an input 
> change-data-feed, and is responsible for reconciling the incoming change data 
> onto some target table that follows SCD2 replication semantics.
> SCD2 flows also maintain an "auxiliary" table to keep track of early-arriving 
> out-of-order received events state. Each microbatch will need to reconcile 
> against this auxiliary table as well, and update the auxiliary table's state 
> appropriately for future microbatches.
>  
> *Find Affected Aux/Target Table Rows*
> After preprocessing the microbatch such that we have each incoming row's 
> startAt, endAt, and recordStartAt projected, the next step in reconciliation 
> is determining which _existing_ rows in the auxiliary and target tables 
> either might be affected by the incoming rows or they might affect the 
> incoming rows themselves.
> A no-op upsert run row in the auxiliary table can be affected by the 
> microbatch if an incoming row makes the row no longer a no-op (i.e microbatch 
> delivers an interleaving row that does indeed change history tracked 
> columns). A tombstone in the auxiliary table can affect an incoming row if it 
> now matches against an upsert in the microbatch.
> A row in the target table can be affected by the microbatch if an incoming 
> upsert makes the target table's row a no-op upsert, or an incoming 
> delete/upsert event terminates an existing row in the target table. An active 
> row (endAt=null) in the target table could become terminated, or an existing 
> closed row in the target table could become bisected. Conversely, existing 
> rows in the target table can dictate when an incoming upsert row should be 
> considered closed from.
> We take a practical, conservative approach in selecting the set of rows that 
> could possibly be affected or affect the microbatch. Per key we retrieve 
> *all* existing rows whose startAt comes after the youngest sequence in the 
> incoming microbatch, as well as the first existing row in both the target/aux 
> that comes _before_ the youngest sequence.
> This is opposed to doing a very complex and expensive join to determine which 
> rows are definitively affected by/affecting the microbatch. In practice its 
> not common for events to actually receive very old events out of order, so 
> pulling in all existing rows that come after the oldest row in the microbatch 
> will generally be a very small result set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to