Anish Mahto created SPARK-57152:
-----------------------------------

             Summary: Implement SCD2 Batch Processor; Find Affected Aux/Target 
Table Rows
                 Key: SPARK-57152
                 URL: https://issues.apache.org/jira/browse/SPARK-57152
             Project: Spark
          Issue Type: Sub-task
          Components: Declarative Pipelines
    Affects Versions: 4.3.0
            Reporter: Anish Mahto


{*}Preamble{*}:

The SCD type 2 flow is a foreachBatch streaming query on an input 
change-data-feed, and is responsible for reconciling the incoming change data 
onto some target table that follows SCD2 replication semantics.

SCD2 flows also maintain an "auxiliary" table to keep track of early-arriving 
out-of-order received events state. Each microbatch will need to reconcile 
against this auxiliary table as well, and update the auxiliary table's state 
appropriately for future microbatches.

 

*Find Affected Aux/Target Table Rows*

After preprocessing the microbatch such that we have each incoming row's 
startAt, endAt, and recordStartAt projected, the next step in reconciliation is 
determining which _existing_ rows in the auxiliary and target tables either 
might be affected by the incoming rows or they might affect the incoming rows 
themselves.

A no-op upsert run row in the auxiliary table can be affected by the microbatch 
if an incoming row makes the row no longer a no-op (i.e microbatch delivers an 
interleaving row that does indeed change history tracked columns). A tombstone 
in the auxiliary table can affect an incoming row if it now matches against an 
upsert in the microbatch.

A row in the target table can be affected by the microbatch if an incoming 
upsert makes the target table's row a no-op upsert, or an incoming 
delete/upsert event terminates an existing row in the target table. An active 
row (endAt=null) in the target table could become terminated, or an existing 
closed row in the target table could become bisected. Conversely, existing rows 
in the target table can dictate when an incoming upsert row should be 
considered closed from.

We take a practical, conservative approach in selecting the set of rows that 
could possibly be affected or affect the microbatch. Per key we retrieve *all* 
existing rows whose startAt comes after the youngest sequence in the incoming 
microbatch, as well as the first existing row in both the target/aux that comes 
_before_ the youngest sequence.

This is opposed to doing a very complex and expensive join to determine which 
rows are definitively affected by/affecting the microbatch. In practice its not 
common for events to actually receive very old events out of order, so pulling 
in all existing rows that come after the oldest row in the microbatch will 
generally be a very small result set.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to