[
https://issues.apache.org/jira/browse/SPARK-57152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jose Torres reassigned SPARK-57152:
-----------------------------------
Assignee: Anish Mahto
> Implement SCD2 Batch Processor; Find Affected Aux/Target Table Rows
> -------------------------------------------------------------------
>
> Key: SPARK-57152
> URL: https://issues.apache.org/jira/browse/SPARK-57152
> Project: Spark
> Issue Type: Sub-task
> Components: Declarative Pipelines
> Affects Versions: 4.3.0
> Reporter: Anish Mahto
> Assignee: Anish Mahto
> Priority: Major
> Labels: pull-request-available
>
> {*}Preamble{*}:
> The SCD type 2 flow is a foreachBatch streaming query on an input
> change-data-feed, and is responsible for reconciling the incoming change data
> onto some target table that follows SCD2 replication semantics.
> SCD2 flows also maintain an "auxiliary" table to keep track of early-arriving
> out-of-order received events state. Each microbatch will need to reconcile
> against this auxiliary table as well, and update the auxiliary table's state
> appropriately for future microbatches.
>
> *Find Affected Aux/Target Table Rows*
> After preprocessing the microbatch such that we have each incoming row's
> startAt, endAt, and recordStartAt projected, the next step in reconciliation
> is determining which _existing_ rows in the auxiliary and target tables
> either might be affected by the incoming rows or they might affect the
> incoming rows themselves.
> A no-op upsert run row in the auxiliary table can be affected by the
> microbatch if an incoming row makes the row no longer a no-op (i.e microbatch
> delivers an interleaving row that does indeed change history tracked
> columns). A tombstone in the auxiliary table can affect an incoming row if it
> now matches against an upsert in the microbatch.
> A row in the target table can be affected by the microbatch if an incoming
> upsert makes the target table's row a no-op upsert, or an incoming
> delete/upsert event terminates an existing row in the target table. An active
> row (endAt=null) in the target table could become terminated, or an existing
> closed row in the target table could become bisected. Conversely, existing
> rows in the target table can dictate when an incoming upsert row should be
> considered closed from.
> We take a practical, conservative approach in selecting the set of rows that
> could possibly be affected or affect the microbatch. Per key we retrieve
> *all* existing rows whose startAt comes after the youngest sequence in the
> incoming microbatch, as well as the first existing row in both the target/aux
> that comes _before_ the youngest sequence.
> This is opposed to doing a very complex and expensive join to determine which
> rows are definitively affected by/affecting the microbatch. In practice its
> not common for events to actually receive very old events out of order, so
> pulling in all existing rows that come after the oldest row in the microbatch
> will generally be a very small result set.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]