> Have identified a race condition , in the current DagSchdeduler code base
Does it lead to the failure of full-stage retry? Can you also share your PR so that we can take a closer look? BTW, I'm working with my colleagues on this runtime checksum idea. We have something similar in our internal infra and I think we can create a working solution shortly. On Thu, Feb 13, 2025 at 12:53 PM Santosh Pingale <santosh.ping...@adyen.com> wrote: > > Maybe we should do it at runtime: if Spark retries a shuffle stage but > the data becomes different (e.g. use checksum to check it), then Spark > should retry all the partitions of this stage. > > Having bitten hard recently by this behavior in spark and after having > gone down the rabbit hole to investigate, we should definitely try to > refine and implement this approach. The non-deterministicness is dependent > on several factors, including datatypes of the columns or use of > repartition; it is not easy for the users the reason about this behavior. > > PS: The JIRA that I found and helped me confirm my understanding of the > problem, plus educate internal users and platform team: > https://issues.apache.org/jira/browse/SPARK-38388. Checksum approach was > brought up in that JIRA too and I feel that is the balanced way to look at > this problem. > > Kind regards > Santosh > > On Thu, Feb 13, 2025, 12:41 AM Asif Shahid <asif.sha...@gmail.com> wrote: > >> Hi, >> Following up on the issue with some information: >> 1) have opened a PR related to fix the isInDeterminate method in Stage , >> RDD etc. >> 2) After tweaking the source code , have been able to reproduce the data >> loss issue reliably, but cannot productize the test, as it requires a lot >> of inline interception and coaxing to induce test failure in a single VM >> unit test. will post a diff of the source code patch and bug test. >> 3) Have identified a race condition , in the current DagSchdeduler code >> base: >> In the current DagScheduler code, function *handleTaskCompletion, * when >> a task fails, while resubmitting the failure as ResubmitFailedStage, prior >> to submitting, it checks if the shuffle is inDeterminate. If it is , then >> stage is submitted ONLY IF the ResultStage has ZERO partitions processed. >> Which is right, But it is not a atomic operation till the point , where >> missingPartitions from stage is invoked. As a result it is possible that >> one of the task from the current attempt, marks one of the partition as >> successful, and while the new attempt is made, instead of getting missing >> partitions as total partitions, it fetches only some of them. >> Will be modifying the PR to fix the race. >> Regards >> Asif >> >> On Sun, Jan 26, 2025 at 11:19 PM Asif Shahid <asif.sha...@gmail.com> >> wrote: >> >>> Shouldn't it be possible to determine with static data , if output will >>> be deterministic ?. Expressions already have deterministic flag. So when an >>> attribute is created from alias, it will be possible to know if attribute >>> is pointing to an inDeterminate component. >>> >>> On Sun, Jan 26, 2025, 11:09 PM Wenchen Fan <cloud0...@gmail.com> wrote: >>> >>>> It looks like a hard problem to statically analyze the query plan and >>>> decide whether a Spark stage is deterministic or not. When I added >>>> RDD DeterministicLevel, I thought it was not a hard problem for the callers >>>> to specify it, but seems I was wrong. >>>> >>>> Maybe we should do it at runtime: if Spark retries a shuffle stage but >>>> the data becomes different (e.g. use checksum to check it), then Spark >>>> should retry all the partitions of this stage. I'll look into this repro >>>> after I'm back from the national holiday. >>>> >>>> On Mon, Jan 27, 2025 at 2:16 PM Asif Shahid <asif.sha...@gmail.com> >>>> wrote: >>>> >>>>> >>>>> I am using below test. as a unit test though , it will pass, as to >>>>> simulate executor lost in a single vm is difficult, but there is >>>>> definitely >>>>> a bug. >>>>> Using debugger if you check, the ShuffleStage.isDeterminate is turning >>>>> out to be true, though it clearly should not be. >>>>> As result if you look at DagScheduler, TaskSchedulerImpl and TaskSet >>>>> code, it will not retry the ShuffleStage fully, instead would only retry >>>>> missing task. >>>>> Which means on retry if the join used a random value which puts it in >>>>> already completed shuffle task, the results will be missed. >>>>> >>>>>> package org.apache.spark.sql.vectorized >>>>>> >>>>>> import org.apache.spark.rdd.ZippedPartitionsRDD2 >>>>>> import org.apache.spark.sql.{DataFrame, Encoders, QueryTest} >>>>>> import org.apache.spark.sql.catalyst.expressions.Literal >>>>>> import org.apache.spark.sql.execution.datasources.FileScanRDD >>>>>> import org.apache.spark.sql.functions._ >>>>>> import org.apache.spark.sql.internal.SQLConf >>>>>> import org.apache.spark.sql.test.SharedSparkSession >>>>>> import org.apache.spark.sql.types.LongType >>>>>> >>>>>> class BugTest extends QueryTest with SharedSparkSession { >>>>>> import testImplicits._ >>>>>> /* test("no retries") { >>>>>> withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { >>>>>> val baseDf = spark.createDataset( >>>>>> Seq((1L, "a"), (2L, "b"), (3L, "c"), (null, "w"), (null, "x"), >>>>>> (null, "y"), (null, "z")))( >>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>> Encoders.STRING)).toDF("pkLeft", "strleft") >>>>>> >>>>>> val leftOuter = baseDf.select( >>>>>> $"strleft", when(isnull($"pkLeft"), >>>>>> monotonically_increasing_id() + Literal(100)). >>>>>> otherwise($"pkLeft").as("pkLeft")) >>>>>> leftOuter.show(10000) >>>>>> >>>>>> val innerRight = spark.createDataset( >>>>>> Seq((1L, "11"), (2L, "22"), (3L, "33")))( >>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>> Encoders.STRING)).toDF("pkRight", "strright") >>>>>> >>>>>> val innerjoin = leftOuter.join(innerRight, $"pkLeft" === >>>>>> $"pkRight", "inner") >>>>>> >>>>>> innerjoin.show(1000) >>>>>> >>>>>> val outerjoin = leftOuter.join(innerRight, $"pkLeft" === >>>>>> $"pkRight", "left_outer") >>>>>> >>>>>> outerjoin.show(1000) >>>>>> } >>>>>> } */ >>>>>> >>>>>> test("with retries") { >>>>>> withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { >>>>>> withTable("outer", "inner") { >>>>>> createBaseTables() >>>>>> val outerjoin: DataFrame = getOuterJoinDF >>>>>> >>>>>> println("Initial data") >>>>>> outerjoin.show(1000) >>>>>> val correctRows = outerjoin.collect() >>>>>> for( i <- 0 until 100) { >>>>>> FileScanRDD.throwException = false >>>>>> ZippedPartitionsRDD2.throwException = true >>>>>> val rowsAfterRetry = getOuterJoinDF.collect() >>>>>> import scala.jdk.CollectionConverters._ >>>>>> val temp = spark.createDataFrame(rowsAfterRetry.toSeq.asJava, >>>>>> outerjoin.schema) >>>>>> println("after retry data") >>>>>> temp.show(1000) >>>>>> assert(correctRows.length == rowsAfterRetry.length) >>>>>> val retriedResults = rowsAfterRetry.toBuffer >>>>>> correctRows.foreach(r => { >>>>>> val index = retriedResults.indexWhere(x => >>>>>> r.getString(0) == x.getString(0) && >>>>>> (r.getLong(1) == x.getLong(1) || (x.isNullAt(2) && >>>>>> r.isNullAt(2) && >>>>>> x.isNullAt(3) && r.isNullAt(3))) && >>>>>> ((r.isNullAt(2) && x.isNullAt(2)) || r.getLong(2) == >>>>>> x.getLong(2)) && >>>>>> ((r.isNullAt(3) && x.isNullAt(3)) || r.getString(3) == >>>>>> x.getString(3))) >>>>>> assert(index >= 0) >>>>>> retriedResults.remove(index) >>>>>> } >>>>>> ) >>>>>> assert(retriedResults.isEmpty) >>>>>> } >>>>>> >>>>>> // Thread.sleep(10000000) >>>>>> } >>>>>> } >>>>>> } >>>>>> >>>>>> private def createBaseTables(): Unit = { >>>>>> /*val outerDf = spark.createDataset( >>>>>> Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, >>>>>> "cc"), (null, "cc")))( >>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>> Encoders.STRING)).toDF("pkLeft", "strleft") >>>>>> outerDf.write.format("parquet").saveAsTable("outer")*/ >>>>>> >>>>>> val outerDf = spark.createDataset( >>>>>> Seq((1L, "aa"), (null, "aa"), (2L, "aa"), (null, "bb"), (3L, >>>>>> "bb"), (null, "bb")))( >>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>> Encoders.STRING)).toDF("pkLeftt", "strleft") >>>>>> >>>>>> outerDf.write.format("parquet").partitionBy("strleft").saveAsTable("outer") >>>>>> >>>>>> /*val innerDf = spark.createDataset( >>>>>> Seq((1L, "11"), (2L, "22"), (3L, "33")))( >>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>> Encoders.STRING)).toDF("pkRight", "strright")*/ >>>>>> >>>>>> val innerDf = spark.createDataset( >>>>>> Seq((1L, "11"), (2L, "11"), (3L, "33")))( >>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>> Encoders.STRING)).toDF("pkRight", "strright") >>>>>> >>>>>> >>>>>> innerDf.write.format("parquet").partitionBy("strright").saveAsTable("inner") >>>>>> >>>>>> val innerInnerDf = spark.createDataset( >>>>>> Seq((1L, "111"), (2L, "222"), (3L, "333")))( >>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>> Encoders.STRING)).toDF("pkpkRight", "strstrright") >>>>>> >>>>>> // innerInnerDf.write.format("parquet").saveAsTable("innerinner") >>>>>> } >>>>>> >>>>>> private def getOuterJoinDF = { >>>>>> val leftOuter = spark.table("outer").select( >>>>>> $"strleft", when(isnull($"pkLeftt"), floor(rand() * >>>>>> Literal(10000000L)). >>>>>> cast(LongType)). >>>>>> otherwise($"pkLeftt").as("pkLeft")) >>>>>> >>>>>> val innerRight = spark.table("inner") >>>>>> // val innerinnerRight = spark.table("innerinner") >>>>>> >>>>>> val outerjoin = leftOuter.hint("shuffle_hash"). >>>>>> join(innerRight, $"pkLeft" === $"pkRight", "left_outer") >>>>>> outerjoin >>>>>> /* >>>>>> val outerOuterJoin = outerjoin.hint("shuffle_hash"). >>>>>> join(innerinnerRight, $"pkLeft" === $"pkpkRight", "left_outer") >>>>>> outerOuterJoin >>>>>> >>>>>> */ >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> >>>>> On Sun, Jan 26, 2025 at 10:05 PM Asif Shahid <asif.sha...@gmail.com> >>>>> wrote: >>>>> >>>>>> Sure. I will send prototypical query tomorrow. Though its difficult >>>>>> to simulate issue using unit test , but I think the issue is >>>>>> Rdd.isIndeterminate is not returning true for the query. As a result, >>>>>> on retry, the shuffle stage is not reattempted fully. >>>>>> And rdd is not returning inDeterminate as true , is due to ShuffleRdd >>>>>> not taking into account , as the ShuffleDependency is not taking into >>>>>> account inDeterminate nature of HashPartitioner. >>>>>> Moreover the attribute ref provided to HashPartitioner though is >>>>>> pointing to an inDeterminate alias, is having deterministic flag as true >>>>>> ( >>>>>> which in itself is logically correct). >>>>>> So I feel apart from ShuffleDependency code change, all expressions >>>>>> should have a lazy Boolean say containsIndeterministic component. Which >>>>>> will be true if the expression is indeterministic or contains any >>>>>> attribute >>>>>> ref which has is indeterministicComponent true. >>>>>> >>>>>> And on personal note.. thanks for your interest.. this is very rare >>>>>> attitude. >>>>>> Regards >>>>>> Asif >>>>>> >>>>>> On Sun, Jan 26, 2025, 9:45 PM Ángel <angel.alvarez.pas...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Asif, >>>>>>> >>>>>>> Could you provide an example (code+dataset) to analize this? Looks >>>>>>> interesting ... >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> Ángel >>>>>>> >>>>>>> El dom, 26 ene 2025 a las 20:58, Asif Shahid (<asif.sha...@gmail.com>) >>>>>>> escribió: >>>>>>> >>>>>>>> Hi, >>>>>>>> On further thoughts, I concur that leaf expressions like >>>>>>>> AttributeRefs can always be considered to be deterministic, as , as a >>>>>>>> java >>>>>>>> variable the value contained in it per iteration is invariant ( except >>>>>>>> when >>>>>>>> changed by some deterministic logic). So in that sense what I said in >>>>>>>> the >>>>>>>> above mail as that an issue is incorrect. >>>>>>>> But I think that AttributeRef should have a boolean method which >>>>>>>> tells, whether the value it represents is from an indeterminate source >>>>>>>> or >>>>>>>> not. >>>>>>>> Regards >>>>>>>> Asif >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jan 24, 2025 at 5:18 PM Asif Shahid <asif.sha...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> While testing a use case where the query had an outer join such >>>>>>>>> that joining key of left outer table either had a valid value or a >>>>>>>>> random >>>>>>>>> value( salting to avoid skew). >>>>>>>>> The case was reported to have incorrect results in case of node >>>>>>>>> failure, with retry. >>>>>>>>> On debugging the code, have found following, which has left me >>>>>>>>> confused as to what is spark's strategy for indeterministic fields. >>>>>>>>> Some serious issues are : >>>>>>>>> 1) All the leaf expressions, like AttributeReference are always >>>>>>>>> considered deterministic. Which means if an attribute is pointing to >>>>>>>>> an >>>>>>>>> Alias which itself is indeterministic, the attribute will still be >>>>>>>>> considered deterministic >>>>>>>>> 2) In CheckAnalysis there is code which checks whether each >>>>>>>>> Operator either supports indeterministic value or not . Join is not >>>>>>>>> included in the list of supported, but it passes even if the joining >>>>>>>>> key is >>>>>>>>> pointing to an indeterministic alias. ( When I tried fixing it, found >>>>>>>>> a >>>>>>>>> plethora of operators failing Like DeserializedObject, LocalRelation >>>>>>>>> etc >>>>>>>>> which are not supposed to contain indeterministic attributes ( >>>>>>>>> because they >>>>>>>>> are not in the list of supporting operators). >>>>>>>>> 3) The ShuffleDependency does not check for indeterministic nature >>>>>>>>> of partitioner ( fixed it locally and then realized that there is the >>>>>>>>> bug >>>>>>>>> #1 which needs to be fixed too). >>>>>>>>> >>>>>>>>> The code in DagScheduler / TaskSet, TaskScheduler etc, seems to >>>>>>>>> have been written , keeping in mind the indeterministic nature of the >>>>>>>>> previous and current stages , so as to rexecute previous stages as a >>>>>>>>> whole, >>>>>>>>> instead of just missing tasks, but the above 3 points, do not seem to >>>>>>>>> support the code of DagScheduler / TaskScheduler. >>>>>>>>> >>>>>>>>> Regards >>>>>>>>> Asif >>>>>>>>> >>>>>>>>>