Anish Mahto created SPARK-57152:
-----------------------------------
Summary: Implement SCD2 Batch Processor; Find Affected Aux/Target
Table Rows
Key: SPARK-57152
URL: https://issues.apache.org/jira/browse/SPARK-57152
Project: Spark
Issue Type: Sub-task
Components: Declarative Pipelines
Affects Versions: 4.3.0
Reporter: Anish Mahto
{*}Preamble{*}:
The SCD type 2 flow is a foreachBatch streaming query on an input
change-data-feed, and is responsible for reconciling the incoming change data
onto some target table that follows SCD2 replication semantics.
SCD2 flows also maintain an "auxiliary" table to keep track of early-arriving
out-of-order received events state. Each microbatch will need to reconcile
against this auxiliary table as well, and update the auxiliary table's state
appropriately for future microbatches.
*Find Affected Aux/Target Table Rows*
After preprocessing the microbatch such that we have each incoming row's
startAt, endAt, and recordStartAt projected, the next step in reconciliation is
determining which _existing_ rows in the auxiliary and target tables either
might be affected by the incoming rows or they might affect the incoming rows
themselves.
A no-op upsert run row in the auxiliary table can be affected by the microbatch
if an incoming row makes the row no longer a no-op (i.e microbatch delivers an
interleaving row that does indeed change history tracked columns). A tombstone
in the auxiliary table can affect an incoming row if it now matches against an
upsert in the microbatch.
A row in the target table can be affected by the microbatch if an incoming
upsert makes the target table's row a no-op upsert, or an incoming
delete/upsert event terminates an existing row in the target table. An active
row (endAt=null) in the target table could become terminated, or an existing
closed row in the target table could become bisected. Conversely, existing rows
in the target table can dictate when an incoming upsert row should be
considered closed from.
We take a practical, conservative approach in selecting the set of rows that
could possibly be affected or affect the microbatch. Per key we retrieve *all*
existing rows whose startAt comes after the youngest sequence in the incoming
microbatch, as well as the first existing row in both the target/aux that comes
_before_ the youngest sequence.
This is opposed to doing a very complex and expensive join to determine which
rows are definitively affected by/affecting the microbatch. In practice its not
common for events to actually receive very old events out of order, so pulling
in all existing rows that come after the oldest row in the microbatch will
generally be a very small result set.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]