> Maybe we should do it at runtime: if Spark retries a shuffle stage but the data becomes different (e.g. use checksum to check it), then Spark should retry all the partitions of this stage.
Having bitten hard recently by this behavior in spark and after having gone down the rabbit hole to investigate, we should definitely try to refine and implement this approach. The non-deterministicness is dependent on several factors, including datatypes of the columns or use of repartition; it is not easy for the users the reason about this behavior. PS: The JIRA that I found and helped me confirm my understanding of the problem, plus educate internal users and platform team: https://issues.apache.org/jira/browse/SPARK-38388. Checksum approach was brought up in that JIRA too and I feel that is the balanced way to look at this problem. Kind regards Santosh On Thu, Feb 13, 2025, 12:41 AM Asif Shahid <asif.sha...@gmail.com> wrote: > Hi, > Following up on the issue with some information: > 1) have opened a PR related to fix the isInDeterminate method in Stage , > RDD etc. > 2) After tweaking the source code , have been able to reproduce the data > loss issue reliably, but cannot productize the test, as it requires a lot > of inline interception and coaxing to induce test failure in a single VM > unit test. will post a diff of the source code patch and bug test. > 3) Have identified a race condition , in the current DagSchdeduler code > base: > In the current DagScheduler code, function *handleTaskCompletion, * when > a task fails, while resubmitting the failure as ResubmitFailedStage, prior > to submitting, it checks if the shuffle is inDeterminate. If it is , then > stage is submitted ONLY IF the ResultStage has ZERO partitions processed. > Which is right, But it is not a atomic operation till the point , where > missingPartitions from stage is invoked. As a result it is possible that > one of the task from the current attempt, marks one of the partition as > successful, and while the new attempt is made, instead of getting missing > partitions as total partitions, it fetches only some of them. > Will be modifying the PR to fix the race. > Regards > Asif > > On Sun, Jan 26, 2025 at 11:19 PM Asif Shahid <asif.sha...@gmail.com> > wrote: > >> Shouldn't it be possible to determine with static data , if output will >> be deterministic ?. Expressions already have deterministic flag. So when an >> attribute is created from alias, it will be possible to know if attribute >> is pointing to an inDeterminate component. >> >> On Sun, Jan 26, 2025, 11:09 PM Wenchen Fan <cloud0...@gmail.com> wrote: >> >>> It looks like a hard problem to statically analyze the query plan and >>> decide whether a Spark stage is deterministic or not. When I added >>> RDD DeterministicLevel, I thought it was not a hard problem for the callers >>> to specify it, but seems I was wrong. >>> >>> Maybe we should do it at runtime: if Spark retries a shuffle stage but >>> the data becomes different (e.g. use checksum to check it), then Spark >>> should retry all the partitions of this stage. I'll look into this repro >>> after I'm back from the national holiday. >>> >>> On Mon, Jan 27, 2025 at 2:16 PM Asif Shahid <asif.sha...@gmail.com> >>> wrote: >>> >>>> >>>> I am using below test. as a unit test though , it will pass, as to >>>> simulate executor lost in a single vm is difficult, but there is definitely >>>> a bug. >>>> Using debugger if you check, the ShuffleStage.isDeterminate is turning >>>> out to be true, though it clearly should not be. >>>> As result if you look at DagScheduler, TaskSchedulerImpl and TaskSet >>>> code, it will not retry the ShuffleStage fully, instead would only retry >>>> missing task. >>>> Which means on retry if the join used a random value which puts it in >>>> already completed shuffle task, the results will be missed. >>>> >>>>> package org.apache.spark.sql.vectorized >>>>> >>>>> import org.apache.spark.rdd.ZippedPartitionsRDD2 >>>>> import org.apache.spark.sql.{DataFrame, Encoders, QueryTest} >>>>> import org.apache.spark.sql.catalyst.expressions.Literal >>>>> import org.apache.spark.sql.execution.datasources.FileScanRDD >>>>> import org.apache.spark.sql.functions._ >>>>> import org.apache.spark.sql.internal.SQLConf >>>>> import org.apache.spark.sql.test.SharedSparkSession >>>>> import org.apache.spark.sql.types.LongType >>>>> >>>>> class BugTest extends QueryTest with SharedSparkSession { >>>>> import testImplicits._ >>>>> /* test("no retries") { >>>>> withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { >>>>> val baseDf = spark.createDataset( >>>>> Seq((1L, "a"), (2L, "b"), (3L, "c"), (null, "w"), (null, "x"), >>>>> (null, "y"), (null, "z")))( >>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>> Encoders.STRING)).toDF("pkLeft", "strleft") >>>>> >>>>> val leftOuter = baseDf.select( >>>>> $"strleft", when(isnull($"pkLeft"), monotonically_increasing_id() >>>>> + Literal(100)). >>>>> otherwise($"pkLeft").as("pkLeft")) >>>>> leftOuter.show(10000) >>>>> >>>>> val innerRight = spark.createDataset( >>>>> Seq((1L, "11"), (2L, "22"), (3L, "33")))( >>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>> Encoders.STRING)).toDF("pkRight", "strright") >>>>> >>>>> val innerjoin = leftOuter.join(innerRight, $"pkLeft" === >>>>> $"pkRight", "inner") >>>>> >>>>> innerjoin.show(1000) >>>>> >>>>> val outerjoin = leftOuter.join(innerRight, $"pkLeft" === >>>>> $"pkRight", "left_outer") >>>>> >>>>> outerjoin.show(1000) >>>>> } >>>>> } */ >>>>> >>>>> test("with retries") { >>>>> withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { >>>>> withTable("outer", "inner") { >>>>> createBaseTables() >>>>> val outerjoin: DataFrame = getOuterJoinDF >>>>> >>>>> println("Initial data") >>>>> outerjoin.show(1000) >>>>> val correctRows = outerjoin.collect() >>>>> for( i <- 0 until 100) { >>>>> FileScanRDD.throwException = false >>>>> ZippedPartitionsRDD2.throwException = true >>>>> val rowsAfterRetry = getOuterJoinDF.collect() >>>>> import scala.jdk.CollectionConverters._ >>>>> val temp = spark.createDataFrame(rowsAfterRetry.toSeq.asJava, >>>>> outerjoin.schema) >>>>> println("after retry data") >>>>> temp.show(1000) >>>>> assert(correctRows.length == rowsAfterRetry.length) >>>>> val retriedResults = rowsAfterRetry.toBuffer >>>>> correctRows.foreach(r => { >>>>> val index = retriedResults.indexWhere(x => >>>>> r.getString(0) == x.getString(0) && >>>>> (r.getLong(1) == x.getLong(1) || (x.isNullAt(2) && >>>>> r.isNullAt(2) && >>>>> x.isNullAt(3) && r.isNullAt(3))) && >>>>> ((r.isNullAt(2) && x.isNullAt(2)) || r.getLong(2) == >>>>> x.getLong(2)) && >>>>> ((r.isNullAt(3) && x.isNullAt(3)) || r.getString(3) == >>>>> x.getString(3))) >>>>> assert(index >= 0) >>>>> retriedResults.remove(index) >>>>> } >>>>> ) >>>>> assert(retriedResults.isEmpty) >>>>> } >>>>> >>>>> // Thread.sleep(10000000) >>>>> } >>>>> } >>>>> } >>>>> >>>>> private def createBaseTables(): Unit = { >>>>> /*val outerDf = spark.createDataset( >>>>> Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, "cc"), >>>>> (null, "cc")))( >>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>> Encoders.STRING)).toDF("pkLeft", "strleft") >>>>> outerDf.write.format("parquet").saveAsTable("outer")*/ >>>>> >>>>> val outerDf = spark.createDataset( >>>>> Seq((1L, "aa"), (null, "aa"), (2L, "aa"), (null, "bb"), (3L, "bb"), >>>>> (null, "bb")))( >>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>> Encoders.STRING)).toDF("pkLeftt", "strleft") >>>>> >>>>> outerDf.write.format("parquet").partitionBy("strleft").saveAsTable("outer") >>>>> >>>>> /*val innerDf = spark.createDataset( >>>>> Seq((1L, "11"), (2L, "22"), (3L, "33")))( >>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>> Encoders.STRING)).toDF("pkRight", "strright")*/ >>>>> >>>>> val innerDf = spark.createDataset( >>>>> Seq((1L, "11"), (2L, "11"), (3L, "33")))( >>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>> Encoders.STRING)).toDF("pkRight", "strright") >>>>> >>>>> >>>>> innerDf.write.format("parquet").partitionBy("strright").saveAsTable("inner") >>>>> >>>>> val innerInnerDf = spark.createDataset( >>>>> Seq((1L, "111"), (2L, "222"), (3L, "333")))( >>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>> Encoders.STRING)).toDF("pkpkRight", "strstrright") >>>>> >>>>> // innerInnerDf.write.format("parquet").saveAsTable("innerinner") >>>>> } >>>>> >>>>> private def getOuterJoinDF = { >>>>> val leftOuter = spark.table("outer").select( >>>>> $"strleft", when(isnull($"pkLeftt"), floor(rand() * >>>>> Literal(10000000L)). >>>>> cast(LongType)). >>>>> otherwise($"pkLeftt").as("pkLeft")) >>>>> >>>>> val innerRight = spark.table("inner") >>>>> // val innerinnerRight = spark.table("innerinner") >>>>> >>>>> val outerjoin = leftOuter.hint("shuffle_hash"). >>>>> join(innerRight, $"pkLeft" === $"pkRight", "left_outer") >>>>> outerjoin >>>>> /* >>>>> val outerOuterJoin = outerjoin.hint("shuffle_hash"). >>>>> join(innerinnerRight, $"pkLeft" === $"pkpkRight", "left_outer") >>>>> outerOuterJoin >>>>> >>>>> */ >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> >>>> On Sun, Jan 26, 2025 at 10:05 PM Asif Shahid <asif.sha...@gmail.com> >>>> wrote: >>>> >>>>> Sure. I will send prototypical query tomorrow. Though its difficult to >>>>> simulate issue using unit test , but I think the issue is >>>>> Rdd.isIndeterminate is not returning true for the query. As a result, >>>>> on retry, the shuffle stage is not reattempted fully. >>>>> And rdd is not returning inDeterminate as true , is due to ShuffleRdd >>>>> not taking into account , as the ShuffleDependency is not taking into >>>>> account inDeterminate nature of HashPartitioner. >>>>> Moreover the attribute ref provided to HashPartitioner though is >>>>> pointing to an inDeterminate alias, is having deterministic flag as true ( >>>>> which in itself is logically correct). >>>>> So I feel apart from ShuffleDependency code change, all expressions >>>>> should have a lazy Boolean say containsIndeterministic component. Which >>>>> will be true if the expression is indeterministic or contains any >>>>> attribute >>>>> ref which has is indeterministicComponent true. >>>>> >>>>> And on personal note.. thanks for your interest.. this is very rare >>>>> attitude. >>>>> Regards >>>>> Asif >>>>> >>>>> On Sun, Jan 26, 2025, 9:45 PM Ángel <angel.alvarez.pas...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Asif, >>>>>> >>>>>> Could you provide an example (code+dataset) to analize this? Looks >>>>>> interesting ... >>>>>> >>>>>> >>>>>> Regards, >>>>>> Ángel >>>>>> >>>>>> El dom, 26 ene 2025 a las 20:58, Asif Shahid (<asif.sha...@gmail.com>) >>>>>> escribió: >>>>>> >>>>>>> Hi, >>>>>>> On further thoughts, I concur that leaf expressions like >>>>>>> AttributeRefs can always be considered to be deterministic, as , as a >>>>>>> java >>>>>>> variable the value contained in it per iteration is invariant ( except >>>>>>> when >>>>>>> changed by some deterministic logic). So in that sense what I said in >>>>>>> the >>>>>>> above mail as that an issue is incorrect. >>>>>>> But I think that AttributeRef should have a boolean method which >>>>>>> tells, whether the value it represents is from an indeterminate source >>>>>>> or >>>>>>> not. >>>>>>> Regards >>>>>>> Asif >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Jan 24, 2025 at 5:18 PM Asif Shahid <asif.sha...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> While testing a use case where the query had an outer join such >>>>>>>> that joining key of left outer table either had a valid value or a >>>>>>>> random >>>>>>>> value( salting to avoid skew). >>>>>>>> The case was reported to have incorrect results in case of node >>>>>>>> failure, with retry. >>>>>>>> On debugging the code, have found following, which has left me >>>>>>>> confused as to what is spark's strategy for indeterministic fields. >>>>>>>> Some serious issues are : >>>>>>>> 1) All the leaf expressions, like AttributeReference are always >>>>>>>> considered deterministic. Which means if an attribute is pointing to an >>>>>>>> Alias which itself is indeterministic, the attribute will still be >>>>>>>> considered deterministic >>>>>>>> 2) In CheckAnalysis there is code which checks whether each >>>>>>>> Operator either supports indeterministic value or not . Join is not >>>>>>>> included in the list of supported, but it passes even if the joining >>>>>>>> key is >>>>>>>> pointing to an indeterministic alias. ( When I tried fixing it, found a >>>>>>>> plethora of operators failing Like DeserializedObject, LocalRelation >>>>>>>> etc >>>>>>>> which are not supposed to contain indeterministic attributes ( because >>>>>>>> they >>>>>>>> are not in the list of supporting operators). >>>>>>>> 3) The ShuffleDependency does not check for indeterministic nature >>>>>>>> of partitioner ( fixed it locally and then realized that there is the >>>>>>>> bug >>>>>>>> #1 which needs to be fixed too). >>>>>>>> >>>>>>>> The code in DagScheduler / TaskSet, TaskScheduler etc, seems to >>>>>>>> have been written , keeping in mind the indeterministic nature of the >>>>>>>> previous and current stages , so as to rexecute previous stages as a >>>>>>>> whole, >>>>>>>> instead of just missing tasks, but the above 3 points, do not seem to >>>>>>>> support the code of DagScheduler / TaskScheduler. >>>>>>>> >>>>>>>> Regards >>>>>>>> Asif >>>>>>>> >>>>>>>>