[ https://issues.apache.org/jira/browse/SPARK-47019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17816321#comment-17816321 ]
Bruce Robbins commented on SPARK-47019: --------------------------------------- I can reproduce on my laptop using Spark 3.5.0 and {{--master "local-cluster[3,1,1024]"}}. However, I can not reproduce on the latest branch-3.5 or master. So it seems to have been fixed, probably by SPARK-45592. > AQE dynamic cache partitioning causes SortMergeJoin to result in data loss > -------------------------------------------------------------------------- > > Key: SPARK-47019 > URL: https://issues.apache.org/jira/browse/SPARK-47019 > Project: Spark > Issue Type: Bug > Components: Optimizer, Spark Core > Affects Versions: 3.5.0 > Environment: Tested in 3.5.0 > Reproduced on, so far: > * kubernetes deployment > * docker cluster deployment > Local Cluster: > * master > * worker1 (2/2G) > * worker2 (1/1G) > Reporter: Ridvan Appa Bugis > Priority: Blocker > Labels: DAG, caching, correctness, data-loss, > dynamic_allocation, inconsistency, partitioning > Attachments: Screenshot 2024-02-07 at 20.09.44.png, Screenshot > 2024-02-07 at 20.10.07.png, eventLogs-app-20240207175940-0023.zip, > testdata.zip > > > It seems like we have encountered an issue with Spark AQE's dynamic cache > partitioning which causes incorrect *count* output values and data loss. > A similar issue could not be found, so i am creating this ticket to raise > awareness. > > Preconditions: > - Setup a cluster as per environment specification > - Prepare test data (or a data large enough to trigger read by both > executors) > Steps to reproduce: > - Read parent > - Self join parent > - cache + materialize parent > - Join parent with child > > Performing a self-join over a parentDF, then caching + materialising the DF, > and then joining it with a childDF results in *incorrect* count value and > {*}missing data{*}. > > Performing a *repartition* seems to fix the issue, most probably due to > rearrangement of the underlying partitions and statistic update. > > This behaviour is observed over a multi-worker cluster with a job running 2 > executors (1 per worker), when reading a large enough data file by both > executors. > Not reproducible in local mode. > > Circumvention: > So far, by disabling > _spark.sql.optimizer.canChangeCachedPlanOutputPartitioning_ or performing > repartition this can be alleviated, but it is not the fix of the root cause. > > This issue is dangerous considering that data loss is occurring silently and > in absence of proper checks can lead to wrong behaviour/results down the > line. So we have labeled it as a blocker. > > There seems to be a file-size treshold after which dataloss is observed > (possibly implying that it happens when both executors start reading the data > file) > > Minimal example: > {code:java} > // Read parent > val parentData = session.read.format("avro").load("/data/shared/test/parent") > // Self join parent and cache + materialize > val parent = parentData.join(parentData, Seq("PID")).cache() > parent.count() > // Read child > val child = session.read.format("avro").load("/data/shared/test/child") > // Basic join > val resultBasic = child.join( > parent, > parent("PID") === child("PARENT_ID") > ) > // Count: 16479 (Wrong) > println(s"Count no repartition: ${resultBasic.count()}") > // Repartition parent join > val resultRepartition = child.join( > parent.repartition(), > parent("PID") === child("PARENT_ID") > ) > // Count: 50094 (Correct) > println(s"Count with repartition: ${resultRepartition.count()}") {code} > > Invalid count-only DAG: > !Screenshot 2024-02-07 at 20.10.07.png|width=519,height=853! > Valid repartition DAG: > !Screenshot 2024-02-07 at 20.09.44.png|width=368,height=1219! > > Spark submit for this job: > {code:java} > spark-submit > --class ExampleApp > --packages org.apache.spark:spark-avro_2.12:3.5.0 > --deploy-mode cluster > --master spark://spark-master:6066 > --conf spark.sql.autoBroadcastJoinThreshold=-1 > --conf spark.cores.max=3 > --driver-cores 1 > --driver-memory 1g > --executor-cores 1 > --executor-memory 1g > /path/to/test.jar > {code} > The cluster should be setup to the following (worker1(m+e) worker2(e)) as to > split the executors onto two workers. > I have prepared a simple github repository which contains the compilable > above example. > [https://github.com/ridvanappabugis/spark-3.5-issue] > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org