[ 
https://issues.apache.org/jira/browse/SPARK-47019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816321#comment-17816321
 ] 

Bruce Robbins commented on SPARK-47019:
---------------------------------------

I can reproduce on my laptop using Spark 3.5.0 and {{--master 
"local-cluster[3,1,1024]"}}. However, I can not reproduce on the latest 
branch-3.5 or master.

So it seems to have been fixed, probably by SPARK-45592.


> AQE dynamic cache partitioning causes SortMergeJoin to result in data loss
> --------------------------------------------------------------------------
>
>                 Key: SPARK-47019
>                 URL: https://issues.apache.org/jira/browse/SPARK-47019
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, Spark Core
>    Affects Versions: 3.5.0
>         Environment: Tested in 3.5.0
> Reproduced on, so far:
>  * kubernetes deployment
>  * docker cluster deployment
> Local Cluster:
>  * master
>  * worker1 (2/2G)
>  * worker2 (1/1G)
>            Reporter: Ridvan Appa Bugis
>            Priority: Blocker
>              Labels: DAG, caching, correctness, data-loss, 
> dynamic_allocation, inconsistency, partitioning
>         Attachments: Screenshot 2024-02-07 at 20.09.44.png, Screenshot 
> 2024-02-07 at 20.10.07.png, eventLogs-app-20240207175940-0023.zip, 
> testdata.zip
>
>
> It seems like we have encountered an issue with Spark AQE's dynamic cache 
> partitioning which causes incorrect *count* output values and data loss.
> A similar issue could not be found, so i am creating this ticket to raise 
> awareness.
>  
> Preconditions:
>  - Setup a cluster as per environment specification
>  - Prepare test data (or a data large enough to trigger read by both 
> executors)
> Steps to reproduce:
>  - Read parent
>  - Self join parent
>  - cache + materialize parent
>  - Join parent with child
>  
> Performing a self-join over a parentDF, then caching + materialising the DF, 
> and then joining it with a childDF results in *incorrect* count value and 
> {*}missing data{*}.
>  
> Performing a *repartition* seems to fix the issue, most probably due to 
> rearrangement of the underlying partitions and statistic update.
>  
> This behaviour is observed over a multi-worker cluster with a job running 2 
> executors (1 per worker), when reading a large enough data file by both 
> executors.
> Not reproducible in local mode.
>  
> Circumvention:
> So far, by disabling 
> _spark.sql.optimizer.canChangeCachedPlanOutputPartitioning_ or performing 
> repartition this can be alleviated, but it is not the fix of the root cause.
>  
> This issue is dangerous considering that data loss is occurring silently and 
> in absence of proper checks can lead to wrong behaviour/results down the 
> line. So we have labeled it as a blocker.
>  
> There seems to be a file-size treshold after which dataloss is observed 
> (possibly implying that it happens when both executors start reading the data 
> file)
>  
> Minimal example:
> {code:java}
> // Read parent
> val parentData = session.read.format("avro").load("/data/shared/test/parent")
> // Self join parent and cache + materialize
> val parent = parentData.join(parentData, Seq("PID")).cache()
> parent.count()
> // Read child
> val child = session.read.format("avro").load("/data/shared/test/child")
> // Basic join
> val resultBasic = child.join(
>   parent,
>   parent("PID") === child("PARENT_ID")
> )
> // Count: 16479 (Wrong)
> println(s"Count no repartition: ${resultBasic.count()}")
> // Repartition parent join
> val resultRepartition = child.join(
>   parent.repartition(),
>   parent("PID") === child("PARENT_ID")
> )
> // Count: 50094 (Correct)
> println(s"Count with repartition: ${resultRepartition.count()}") {code}
>  
> Invalid count-only DAG:
>   !Screenshot 2024-02-07 at 20.10.07.png|width=519,height=853!
> Valid repartition DAG:
> !Screenshot 2024-02-07 at 20.09.44.png|width=368,height=1219!  
>  
> Spark submit for this job:
> {code:java}
> spark-submit 
>       --class ExampleApp 
>       --packages org.apache.spark:spark-avro_2.12:3.5.0 
>       --deploy-mode cluster 
>       --master spark://spark-master:6066 
>       --conf spark.sql.autoBroadcastJoinThreshold=-1  
>       --conf spark.cores.max=3 
>       --driver-cores 1 
>       --driver-memory 1g 
>       --executor-cores 1 
>       --executor-memory 1g 
>       /path/to/test.jar
>  {code}
> The cluster should be setup to the following (worker1(m+e) worker2(e)) as to 
> split the executors onto two workers.
> I have prepared a simple github repository which contains the compilable 
> above example.
> [https://github.com/ridvanappabugis/spark-3.5-issue]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to