[ 
https://issues.apache.org/jira/browse/SPARK-56683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18086924#comment-18086924
 ] 

Juliusz Sompolski commented on SPARK-56683:
-------------------------------------------

Hi [~yadavay]

I am working on the guaranteed CTE reuse together with a couple of colleagues 
and should have these changes soon, to build DSv2 source materialization on top 
of it.

I originally worked on MergeIntoMaterializeSource a couple years back and the 
localCheckpoint that it uses is a pain in a lot of ways - not performant 
serialization/deserialization, drops out to RDD execution that then drops a lot 
of SQL / AQE logic, needs a very crude handling of retries if we lose some RDD 
blocks. I would like to do it the proper way in DSv2 by using a guaranteed 
reused shuffle, which would automatically pick up all the nice feature that 
shuffles have - more performant format, automatic logic to know what to 
recompute after failures based on checksums etc.

I hoped to have it done for Spark 4.2 but didn't make it in time, but it should 
be ready and up in a PR soonish.

> MERGE INTO TABLE reads the source twice and the two reads can disagree 
> leading to data inconsistency
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-56683
>                 URL: https://issues.apache.org/jira/browse/SPARK-56683
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.2.0
>            Reporter: Juliusz Sompolski
>            Priority: Major
>
> RewriteMergeIntoTable rewrites a MERGE INTO statement into a plan that 
> references the source query in two positions: once as the streamed input to 
> the join that pairs source rows with target rows, and once inside a subquery 
> that the rewrite uses to identify which rows or groups have matching source 
> rows.
> The two positions are independent reads of the same source. When the source 
> is non-deterministic — for example, a table with concurrent writers, a 
> streaming source, or a query containing expressions like rand() — the two 
> reads can observe different sets of rows. The MERGE result is then computed 
> against an inconsistent picture of the source: rows can be filtered in or out 
> by the subquery while the join sees a different set of rows, producing 
> dropped, duplicated, or wrongly-matched rows.
> The two reads of the source need to be made consistent so that both positions 
> in the rewritten plan see the same source data, regardless of source 
> determinism.
> Delta Lake resolved this with 
> [https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala]
>  in their custom MERGE implementation, but DSv2 SupportsRowLevelOperation 
> datasources suffer a possible data inconsistency issue because of it.
> This could be resolved with https://issues.apache.org/jira/browse/SPARK-56685



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to