Hi Wenchen, Apologies... I thought it was Santosh asking for the PR. and race condition... Yes the race condition causes only some partitions to be retried .. May be a checksum can be used as fallback mechanism? I suppose checksum would be implemented on executor sides..? Then there might be condition where shuffle files could be lost before driver/executors are communicated checksum ? Regards Asif
On Thu, Feb 13, 2025 at 7:39 PM Asif Shahid <asif.sha...@gmail.com> wrote: > The bugrepro patch , when applied on current master, will show failure > with incorrect results. > While on the PR branch , it will pass. > The number of iterations in the test is 100. > > Regards > Asif > > On Thu, Feb 13, 2025 at 7:35 PM Asif Shahid <asif.sha...@gmail.com> wrote: > >> Hi, >> Following up on this issue. >> The PR opened is: >> 1)https://github.com/apache/spark/pull/49708 >> The PR IMO fixes the 3 issues which I have described below. >> The race condition fix requires use of read write lock. >> >> 2) Jira is: >> https://issues.apache.org/jira/browse/SPARK-51016 >> >> A checksum approach would result in greater number of retries, I believe: >> There is already mechanism in Expression to identify indeterminism >> >> Anyways, the issues which I think are , multiple, which the opened PR >> addresses. >> 1) The stage.isIndeterminate is buggy as it is not able to identify >> inDeterminate, and the fix spans some files like ShuffleDependency, RDD >> etc. >> 2) The race condition in DagScheduler: >> a) Say a task1 corresponding to Partition P1 of an inDeterminate Stage >> Result fails with Fetch Failure >> b) The code rightly checks if no previous partitions of result stage >> are successful yet, instead of throwing Exception by aborting , it decides >> to retry. >> c) During this window of retry, to check for dependencies and making >> a new attempt initiated by Task1, Task2 corresponding to Partition 2 of >> result stage becomes successful. >> d) The new attempt, now sees instead of all missing partitions, sees n >> -1. and that creates Data corruption. as it is no longer retrying on all >> partitions. >> >> 3) The DagSchedulerSuite's test >> a) test("SPARK-25341: retry all the succeeding stages when the >> map stage is indeterminate") >> has following assertion: >> >> assert(failedStages.collect { >> case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == >> shuffleId2 => stage >> }.head.findMissingPartitions() == Seq(0)) >> >> If my understanding is right and as seen from the outcome of the opened >> PR, as the ShuffleMapSage is turning out to be inDeterminate, all tasks >> should be retried: >> so the assertion should be , as expected value Seq(0, 1) >> >> b) If the above #a is valid, then the second test >> test("SPARK-32923: handle stage failure for indeterminate map stage >> with push-based shuffle") { >> which has assertion: >> >> assert(failedStages.collect { >> case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == >> shuffleId2 => stage >> }.head.findMissingPartitions() == Seq(0)) >> >> should also be instead, Seq(0, 1) >> >> >> I am also attaching a patch which I was using a basis to debug and fix. >> The patch modifies the source code by using inline code interception and >> tweaks to reproduce the issue of data corruption using a unit test. >> >> And I am also attaching the BugTest which I used. >> >> The test is supposed to pass if either it encounters exception from >> spark layer, due to exception >> "Please eliminate the ndeterminacy by checkpointing the RDD before >> repartition and try again" >> or if the results of retry are as expected ( 12 rows along witrh data >> correctness check) >> >> Meanwhile I will try to productize the bug test. >> >> Would appreciate clarification on Test issue #3. And a review of PR, if >> done, would be appreciate much more. >> >> Sanotsh, >> I believe my PR would do a full partition retry ( if the Fetch failed >> happend prior to any other final task being successful, else it will abort >> query throwing Exception). No wrong result. I believe. >> Your review will be much appreciated. >> Regards >> Asif >> >> >> On Thu, Feb 13, 2025 at 4:54 PM Wenchen Fan <cloud0...@gmail.com> wrote: >> >>> > Have identified a race condition , in the current DagSchdeduler code >>> base >>> >>> Does it lead to the failure of full-stage retry? Can you also share your >>> PR so that we can take a closer look? >>> >>> BTW, I'm working with my colleagues on this runtime checksum idea. We >>> have something similar in our internal infra and I think we can create a >>> working solution shortly. >>> >>> On Thu, Feb 13, 2025 at 12:53 PM Santosh Pingale < >>> santosh.ping...@adyen.com> wrote: >>> >>>> > Maybe we should do it at runtime: if Spark retries a shuffle stage >>>> but the data becomes different (e.g. use checksum to check it), then Spark >>>> should retry all the partitions of this stage. >>>> >>>> Having bitten hard recently by this behavior in spark and after having >>>> gone down the rabbit hole to investigate, we should definitely try to >>>> refine and implement this approach. The non-deterministicness is dependent >>>> on several factors, including datatypes of the columns or use of >>>> repartition; it is not easy for the users the reason about this behavior. >>>> >>>> PS: The JIRA that I found and helped me confirm my understanding of the >>>> problem, plus educate internal users and platform team: >>>> https://issues.apache.org/jira/browse/SPARK-38388. Checksum approach >>>> was brought up in that JIRA too and I feel that is the balanced way to look >>>> at this problem. >>>> >>>> Kind regards >>>> Santosh >>>> >>>> On Thu, Feb 13, 2025, 12:41 AM Asif Shahid <asif.sha...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> Following up on the issue with some information: >>>>> 1) have opened a PR related to fix the isInDeterminate method in Stage >>>>> , RDD etc. >>>>> 2) After tweaking the source code , have been able to reproduce the >>>>> data loss issue reliably, but cannot productize the test, as it requires a >>>>> lot of inline interception and coaxing to induce test failure in a single >>>>> VM unit test. will post a diff of the source code patch and bug test. >>>>> 3) Have identified a race condition , in the current >>>>> DagSchdeduler code base: >>>>> In the current DagScheduler code, function >>>>> *handleTaskCompletion, * when a task fails, while resubmitting the >>>>> failure as ResubmitFailedStage, prior to submitting, it checks if the >>>>> shuffle is inDeterminate. If it is , then stage is submitted ONLY IF the >>>>> ResultStage has ZERO partitions processed. Which is right, But it is not >>>>> a >>>>> atomic operation till the point , where missingPartitions from stage is >>>>> invoked. As a result it is possible that one of the task from the >>>>> current >>>>> attempt, marks one of the partition as successful, and while the new >>>>> attempt is made, instead of getting missing partitions as total >>>>> partitions, it fetches only some of them. >>>>> Will be modifying the PR to fix the race. >>>>> Regards >>>>> Asif >>>>> >>>>> On Sun, Jan 26, 2025 at 11:19 PM Asif Shahid <asif.sha...@gmail.com> >>>>> wrote: >>>>> >>>>>> Shouldn't it be possible to determine with static data , if output >>>>>> will be deterministic ?. Expressions already have deterministic flag. So >>>>>> when an attribute is created from alias, it will be possible to know if >>>>>> attribute is pointing to an inDeterminate component. >>>>>> >>>>>> On Sun, Jan 26, 2025, 11:09 PM Wenchen Fan <cloud0...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> It looks like a hard problem to statically analyze the query plan >>>>>>> and decide whether a Spark stage is deterministic or not. When I added >>>>>>> RDD DeterministicLevel, I thought it was not a hard problem for the >>>>>>> callers >>>>>>> to specify it, but seems I was wrong. >>>>>>> >>>>>>> Maybe we should do it at runtime: if Spark retries a shuffle stage >>>>>>> but the data becomes different (e.g. use checksum to check it), then >>>>>>> Spark >>>>>>> should retry all the partitions of this stage. I'll look into this repro >>>>>>> after I'm back from the national holiday. >>>>>>> >>>>>>> On Mon, Jan 27, 2025 at 2:16 PM Asif Shahid <asif.sha...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> I am using below test. as a unit test though , it will pass, as >>>>>>>> to simulate executor lost in a single vm is difficult, but there is >>>>>>>> definitely a bug. >>>>>>>> Using debugger if you check, the ShuffleStage.isDeterminate is >>>>>>>> turning out to be true, though it clearly should not be. >>>>>>>> As result if you look at DagScheduler, TaskSchedulerImpl and >>>>>>>> TaskSet code, it will not retry the ShuffleStage fully, instead would >>>>>>>> only >>>>>>>> retry missing task. >>>>>>>> Which means on retry if the join used a random value which puts it >>>>>>>> in already completed shuffle task, the results will be missed. >>>>>>>> >>>>>>>>> package org.apache.spark.sql.vectorized >>>>>>>>> >>>>>>>>> import org.apache.spark.rdd.ZippedPartitionsRDD2 >>>>>>>>> import org.apache.spark.sql.{DataFrame, Encoders, QueryTest} >>>>>>>>> import org.apache.spark.sql.catalyst.expressions.Literal >>>>>>>>> import org.apache.spark.sql.execution.datasources.FileScanRDD >>>>>>>>> import org.apache.spark.sql.functions._ >>>>>>>>> import org.apache.spark.sql.internal.SQLConf >>>>>>>>> import org.apache.spark.sql.test.SharedSparkSession >>>>>>>>> import org.apache.spark.sql.types.LongType >>>>>>>>> >>>>>>>>> class BugTest extends QueryTest with SharedSparkSession { >>>>>>>>> import testImplicits._ >>>>>>>>> /* test("no retries") { >>>>>>>>> withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { >>>>>>>>> val baseDf = spark.createDataset( >>>>>>>>> Seq((1L, "a"), (2L, "b"), (3L, "c"), (null, "w"), (null, >>>>>>>>> "x"), (null, "y"), (null, "z")))( >>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft") >>>>>>>>> >>>>>>>>> val leftOuter = baseDf.select( >>>>>>>>> $"strleft", when(isnull($"pkLeft"), >>>>>>>>> monotonically_increasing_id() + Literal(100)). >>>>>>>>> otherwise($"pkLeft").as("pkLeft")) >>>>>>>>> leftOuter.show(10000) >>>>>>>>> >>>>>>>>> val innerRight = spark.createDataset( >>>>>>>>> Seq((1L, "11"), (2L, "22"), (3L, "33")))( >>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright") >>>>>>>>> >>>>>>>>> val innerjoin = leftOuter.join(innerRight, $"pkLeft" === >>>>>>>>> $"pkRight", "inner") >>>>>>>>> >>>>>>>>> innerjoin.show(1000) >>>>>>>>> >>>>>>>>> val outerjoin = leftOuter.join(innerRight, $"pkLeft" === >>>>>>>>> $"pkRight", "left_outer") >>>>>>>>> >>>>>>>>> outerjoin.show(1000) >>>>>>>>> } >>>>>>>>> } */ >>>>>>>>> >>>>>>>>> test("with retries") { >>>>>>>>> withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { >>>>>>>>> withTable("outer", "inner") { >>>>>>>>> createBaseTables() >>>>>>>>> val outerjoin: DataFrame = getOuterJoinDF >>>>>>>>> >>>>>>>>> println("Initial data") >>>>>>>>> outerjoin.show(1000) >>>>>>>>> val correctRows = outerjoin.collect() >>>>>>>>> for( i <- 0 until 100) { >>>>>>>>> FileScanRDD.throwException = false >>>>>>>>> ZippedPartitionsRDD2.throwException = true >>>>>>>>> val rowsAfterRetry = getOuterJoinDF.collect() >>>>>>>>> import scala.jdk.CollectionConverters._ >>>>>>>>> val temp = >>>>>>>>> spark.createDataFrame(rowsAfterRetry.toSeq.asJava, outerjoin.schema) >>>>>>>>> println("after retry data") >>>>>>>>> temp.show(1000) >>>>>>>>> assert(correctRows.length == rowsAfterRetry.length) >>>>>>>>> val retriedResults = rowsAfterRetry.toBuffer >>>>>>>>> correctRows.foreach(r => { >>>>>>>>> val index = retriedResults.indexWhere(x => >>>>>>>>> r.getString(0) == x.getString(0) && >>>>>>>>> (r.getLong(1) == x.getLong(1) || (x.isNullAt(2) && >>>>>>>>> r.isNullAt(2) && >>>>>>>>> x.isNullAt(3) && r.isNullAt(3))) && >>>>>>>>> ((r.isNullAt(2) && x.isNullAt(2)) || r.getLong(2) == >>>>>>>>> x.getLong(2)) && >>>>>>>>> ((r.isNullAt(3) && x.isNullAt(3)) || r.getString(3) >>>>>>>>> == x.getString(3))) >>>>>>>>> assert(index >= 0) >>>>>>>>> retriedResults.remove(index) >>>>>>>>> } >>>>>>>>> ) >>>>>>>>> assert(retriedResults.isEmpty) >>>>>>>>> } >>>>>>>>> >>>>>>>>> // Thread.sleep(10000000) >>>>>>>>> } >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> private def createBaseTables(): Unit = { >>>>>>>>> /*val outerDf = spark.createDataset( >>>>>>>>> Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, >>>>>>>>> "cc"), (null, "cc")))( >>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft") >>>>>>>>> outerDf.write.format("parquet").saveAsTable("outer")*/ >>>>>>>>> >>>>>>>>> val outerDf = spark.createDataset( >>>>>>>>> Seq((1L, "aa"), (null, "aa"), (2L, "aa"), (null, "bb"), (3L, >>>>>>>>> "bb"), (null, "bb")))( >>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>> Encoders.STRING)).toDF("pkLeftt", "strleft") >>>>>>>>> >>>>>>>>> outerDf.write.format("parquet").partitionBy("strleft").saveAsTable("outer") >>>>>>>>> >>>>>>>>> /*val innerDf = spark.createDataset( >>>>>>>>> Seq((1L, "11"), (2L, "22"), (3L, "33")))( >>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")*/ >>>>>>>>> >>>>>>>>> val innerDf = spark.createDataset( >>>>>>>>> Seq((1L, "11"), (2L, "11"), (3L, "33")))( >>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright") >>>>>>>>> >>>>>>>>> >>>>>>>>> innerDf.write.format("parquet").partitionBy("strright").saveAsTable("inner") >>>>>>>>> >>>>>>>>> val innerInnerDf = spark.createDataset( >>>>>>>>> Seq((1L, "111"), (2L, "222"), (3L, "333")))( >>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>> Encoders.STRING)).toDF("pkpkRight", "strstrright") >>>>>>>>> >>>>>>>>> // innerInnerDf.write.format("parquet").saveAsTable("innerinner") >>>>>>>>> } >>>>>>>>> >>>>>>>>> private def getOuterJoinDF = { >>>>>>>>> val leftOuter = spark.table("outer").select( >>>>>>>>> $"strleft", when(isnull($"pkLeftt"), floor(rand() * >>>>>>>>> Literal(10000000L)). >>>>>>>>> cast(LongType)). >>>>>>>>> otherwise($"pkLeftt").as("pkLeft")) >>>>>>>>> >>>>>>>>> val innerRight = spark.table("inner") >>>>>>>>> // val innerinnerRight = spark.table("innerinner") >>>>>>>>> >>>>>>>>> val outerjoin = leftOuter.hint("shuffle_hash"). >>>>>>>>> join(innerRight, $"pkLeft" === $"pkRight", "left_outer") >>>>>>>>> outerjoin >>>>>>>>> /* >>>>>>>>> val outerOuterJoin = outerjoin.hint("shuffle_hash"). >>>>>>>>> join(innerinnerRight, $"pkLeft" === $"pkpkRight", "left_outer") >>>>>>>>> outerOuterJoin >>>>>>>>> >>>>>>>>> */ >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> On Sun, Jan 26, 2025 at 10:05 PM Asif Shahid <asif.sha...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Sure. I will send prototypical query tomorrow. Though its >>>>>>>>> difficult to simulate issue using unit test , but I think the issue is >>>>>>>>> Rdd.isIndeterminate is not returning true for the query. As a >>>>>>>>> result, on retry, the shuffle stage is not reattempted fully. >>>>>>>>> And rdd is not returning inDeterminate as true , is due to >>>>>>>>> ShuffleRdd not taking into account , as the ShuffleDependency is not >>>>>>>>> taking >>>>>>>>> into account inDeterminate nature of HashPartitioner. >>>>>>>>> Moreover the attribute ref provided to HashPartitioner though is >>>>>>>>> pointing to an inDeterminate alias, is having deterministic flag as >>>>>>>>> true ( >>>>>>>>> which in itself is logically correct). >>>>>>>>> So I feel apart from ShuffleDependency code change, all >>>>>>>>> expressions should have a lazy Boolean say containsIndeterministic >>>>>>>>> component. Which will be true if the expression is indeterministic or >>>>>>>>> contains any attribute ref which has is indeterministicComponent true. >>>>>>>>> >>>>>>>>> And on personal note.. thanks for your interest.. this is very >>>>>>>>> rare attitude. >>>>>>>>> Regards >>>>>>>>> Asif >>>>>>>>> >>>>>>>>> On Sun, Jan 26, 2025, 9:45 PM Ángel < >>>>>>>>> angel.alvarez.pas...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi Asif, >>>>>>>>>> >>>>>>>>>> Could you provide an example (code+dataset) to analize this? >>>>>>>>>> Looks interesting ... >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Ángel >>>>>>>>>> >>>>>>>>>> El dom, 26 ene 2025 a las 20:58, Asif Shahid (< >>>>>>>>>> asif.sha...@gmail.com>) escribió: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> On further thoughts, I concur that leaf expressions like >>>>>>>>>>> AttributeRefs can always be considered to be deterministic, as , >>>>>>>>>>> as a java >>>>>>>>>>> variable the value contained in it per iteration is invariant ( >>>>>>>>>>> except when >>>>>>>>>>> changed by some deterministic logic). So in that sense what I said >>>>>>>>>>> in the >>>>>>>>>>> above mail as that an issue is incorrect. >>>>>>>>>>> But I think that AttributeRef should have a boolean method which >>>>>>>>>>> tells, whether the value it represents is from an indeterminate >>>>>>>>>>> source or >>>>>>>>>>> not. >>>>>>>>>>> Regards >>>>>>>>>>> Asif >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Jan 24, 2025 at 5:18 PM Asif Shahid < >>>>>>>>>>> asif.sha...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> While testing a use case where the query had an outer join such >>>>>>>>>>>> that joining key of left outer table either had a valid value or a >>>>>>>>>>>> random >>>>>>>>>>>> value( salting to avoid skew). >>>>>>>>>>>> The case was reported to have incorrect results in case of node >>>>>>>>>>>> failure, with retry. >>>>>>>>>>>> On debugging the code, have found following, which has left me >>>>>>>>>>>> confused as to what is spark's strategy for indeterministic fields. >>>>>>>>>>>> Some serious issues are : >>>>>>>>>>>> 1) All the leaf expressions, like AttributeReference are >>>>>>>>>>>> always considered deterministic. Which means if an attribute is >>>>>>>>>>>> pointing to >>>>>>>>>>>> an Alias which itself is indeterministic, the attribute will >>>>>>>>>>>> still be >>>>>>>>>>>> considered deterministic >>>>>>>>>>>> 2) In CheckAnalysis there is code which checks whether each >>>>>>>>>>>> Operator either supports indeterministic value or not . Join is not >>>>>>>>>>>> included in the list of supported, but it passes even if the >>>>>>>>>>>> joining key is >>>>>>>>>>>> pointing to an indeterministic alias. ( When I tried fixing it, >>>>>>>>>>>> found a >>>>>>>>>>>> plethora of operators failing Like DeserializedObject, >>>>>>>>>>>> LocalRelation etc >>>>>>>>>>>> which are not supposed to contain indeterministic attributes ( >>>>>>>>>>>> because they >>>>>>>>>>>> are not in the list of supporting operators). >>>>>>>>>>>> 3) The ShuffleDependency does not check for indeterministic >>>>>>>>>>>> nature of partitioner ( fixed it locally and then realized that >>>>>>>>>>>> there is >>>>>>>>>>>> the bug #1 which needs to be fixed too). >>>>>>>>>>>> >>>>>>>>>>>> The code in DagScheduler / TaskSet, TaskScheduler etc, seems to >>>>>>>>>>>> have been written , keeping in mind the indeterministic nature of >>>>>>>>>>>> the >>>>>>>>>>>> previous and current stages , so as to rexecute previous stages as >>>>>>>>>>>> a whole, >>>>>>>>>>>> instead of just missing tasks, but the above 3 points, do not >>>>>>>>>>>> seem to >>>>>>>>>>>> support the code of DagScheduler / TaskScheduler. >>>>>>>>>>>> >>>>>>>>>>>> Regards >>>>>>>>>>>> Asif >>>>>>>>>>>> >>>>>>>>>>>>