The bugrepro patch , when applied on current master, will show failure with incorrect results. While on the PR branch , it will pass. The number of iterations in the test is 100.
Regards Asif On Thu, Feb 13, 2025 at 7:35 PM Asif Shahid <asif.sha...@gmail.com> wrote: > Hi, > Following up on this issue. > The PR opened is: > 1)https://github.com/apache/spark/pull/49708 > The PR IMO fixes the 3 issues which I have described below. > The race condition fix requires use of read write lock. > > 2) Jira is: > https://issues.apache.org/jira/browse/SPARK-51016 > > A checksum approach would result in greater number of retries, I believe: > There is already mechanism in Expression to identify indeterminism > > Anyways, the issues which I think are , multiple, which the opened PR > addresses. > 1) The stage.isIndeterminate is buggy as it is not able to identify > inDeterminate, and the fix spans some files like ShuffleDependency, RDD > etc. > 2) The race condition in DagScheduler: > a) Say a task1 corresponding to Partition P1 of an inDeterminate Stage > Result fails with Fetch Failure > b) The code rightly checks if no previous partitions of result stage > are successful yet, instead of throwing Exception by aborting , it decides > to retry. > c) During this window of retry, to check for dependencies and making a > new attempt initiated by Task1, Task2 corresponding to Partition 2 of > result stage becomes successful. > d) The new attempt, now sees instead of all missing partitions, sees n > -1. and that creates Data corruption. as it is no longer retrying on all > partitions. > > 3) The DagSchedulerSuite's test > a) test("SPARK-25341: retry all the succeeding stages when the map > stage is indeterminate") > has following assertion: > > assert(failedStages.collect { > case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == > shuffleId2 => stage > }.head.findMissingPartitions() == Seq(0)) > > If my understanding is right and as seen from the outcome of the opened > PR, as the ShuffleMapSage is turning out to be inDeterminate, all tasks > should be retried: > so the assertion should be , as expected value Seq(0, 1) > > b) If the above #a is valid, then the second test > test("SPARK-32923: handle stage failure for indeterminate map stage > with push-based shuffle") { > which has assertion: > > assert(failedStages.collect { > case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == > shuffleId2 => stage > }.head.findMissingPartitions() == Seq(0)) > > should also be instead, Seq(0, 1) > > > I am also attaching a patch which I was using a basis to debug and fix. > The patch modifies the source code by using inline code interception and > tweaks to reproduce the issue of data corruption using a unit test. > > And I am also attaching the BugTest which I used. > > The test is supposed to pass if either it encounters exception from spark > layer, due to exception > "Please eliminate the ndeterminacy by checkpointing the RDD before > repartition and try again" > or if the results of retry are as expected ( 12 rows along witrh data > correctness check) > > Meanwhile I will try to productize the bug test. > > Would appreciate clarification on Test issue #3. And a review of PR, if > done, would be appreciate much more. > > Sanotsh, > I believe my PR would do a full partition retry ( if the Fetch failed > happend prior to any other final task being successful, else it will abort > query throwing Exception). No wrong result. I believe. > Your review will be much appreciated. > Regards > Asif > > > On Thu, Feb 13, 2025 at 4:54 PM Wenchen Fan <cloud0...@gmail.com> wrote: > >> > Have identified a race condition , in the current DagSchdeduler code >> base >> >> Does it lead to the failure of full-stage retry? Can you also share your >> PR so that we can take a closer look? >> >> BTW, I'm working with my colleagues on this runtime checksum idea. We >> have something similar in our internal infra and I think we can create a >> working solution shortly. >> >> On Thu, Feb 13, 2025 at 12:53 PM Santosh Pingale < >> santosh.ping...@adyen.com> wrote: >> >>> > Maybe we should do it at runtime: if Spark retries a shuffle stage but >>> the data becomes different (e.g. use checksum to check it), then Spark >>> should retry all the partitions of this stage. >>> >>> Having bitten hard recently by this behavior in spark and after having >>> gone down the rabbit hole to investigate, we should definitely try to >>> refine and implement this approach. The non-deterministicness is dependent >>> on several factors, including datatypes of the columns or use of >>> repartition; it is not easy for the users the reason about this behavior. >>> >>> PS: The JIRA that I found and helped me confirm my understanding of the >>> problem, plus educate internal users and platform team: >>> https://issues.apache.org/jira/browse/SPARK-38388. Checksum approach >>> was brought up in that JIRA too and I feel that is the balanced way to look >>> at this problem. >>> >>> Kind regards >>> Santosh >>> >>> On Thu, Feb 13, 2025, 12:41 AM Asif Shahid <asif.sha...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> Following up on the issue with some information: >>>> 1) have opened a PR related to fix the isInDeterminate method in Stage >>>> , RDD etc. >>>> 2) After tweaking the source code , have been able to reproduce the >>>> data loss issue reliably, but cannot productize the test, as it requires a >>>> lot of inline interception and coaxing to induce test failure in a single >>>> VM unit test. will post a diff of the source code patch and bug test. >>>> 3) Have identified a race condition , in the current DagSchdeduler code >>>> base: >>>> In the current DagScheduler code, function *handleTaskCompletion, * >>>> when >>>> a task fails, while resubmitting the failure as ResubmitFailedStage, prior >>>> to submitting, it checks if the shuffle is inDeterminate. If it is , then >>>> stage is submitted ONLY IF the ResultStage has ZERO partitions processed. >>>> Which is right, But it is not a atomic operation till the point , where >>>> missingPartitions from stage is invoked. As a result it is possible that >>>> one of the task from the current attempt, marks one of the partition as >>>> successful, and while the new attempt is made, instead of getting missing >>>> partitions as total partitions, it fetches only some of them. >>>> Will be modifying the PR to fix the race. >>>> Regards >>>> Asif >>>> >>>> On Sun, Jan 26, 2025 at 11:19 PM Asif Shahid <asif.sha...@gmail.com> >>>> wrote: >>>> >>>>> Shouldn't it be possible to determine with static data , if output >>>>> will be deterministic ?. Expressions already have deterministic flag. So >>>>> when an attribute is created from alias, it will be possible to know if >>>>> attribute is pointing to an inDeterminate component. >>>>> >>>>> On Sun, Jan 26, 2025, 11:09 PM Wenchen Fan <cloud0...@gmail.com> >>>>> wrote: >>>>> >>>>>> It looks like a hard problem to statically analyze the query plan and >>>>>> decide whether a Spark stage is deterministic or not. When I added >>>>>> RDD DeterministicLevel, I thought it was not a hard problem for the >>>>>> callers >>>>>> to specify it, but seems I was wrong. >>>>>> >>>>>> Maybe we should do it at runtime: if Spark retries a shuffle stage >>>>>> but the data becomes different (e.g. use checksum to check it), then >>>>>> Spark >>>>>> should retry all the partitions of this stage. I'll look into this repro >>>>>> after I'm back from the national holiday. >>>>>> >>>>>> On Mon, Jan 27, 2025 at 2:16 PM Asif Shahid <asif.sha...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> I am using below test. as a unit test though , it will pass, as to >>>>>>> simulate executor lost in a single vm is difficult, but there is >>>>>>> definitely >>>>>>> a bug. >>>>>>> Using debugger if you check, the ShuffleStage.isDeterminate is >>>>>>> turning out to be true, though it clearly should not be. >>>>>>> As result if you look at DagScheduler, TaskSchedulerImpl and TaskSet >>>>>>> code, it will not retry the ShuffleStage fully, instead would only retry >>>>>>> missing task. >>>>>>> Which means on retry if the join used a random value which puts it >>>>>>> in already completed shuffle task, the results will be missed. >>>>>>> >>>>>>>> package org.apache.spark.sql.vectorized >>>>>>>> >>>>>>>> import org.apache.spark.rdd.ZippedPartitionsRDD2 >>>>>>>> import org.apache.spark.sql.{DataFrame, Encoders, QueryTest} >>>>>>>> import org.apache.spark.sql.catalyst.expressions.Literal >>>>>>>> import org.apache.spark.sql.execution.datasources.FileScanRDD >>>>>>>> import org.apache.spark.sql.functions._ >>>>>>>> import org.apache.spark.sql.internal.SQLConf >>>>>>>> import org.apache.spark.sql.test.SharedSparkSession >>>>>>>> import org.apache.spark.sql.types.LongType >>>>>>>> >>>>>>>> class BugTest extends QueryTest with SharedSparkSession { >>>>>>>> import testImplicits._ >>>>>>>> /* test("no retries") { >>>>>>>> withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { >>>>>>>> val baseDf = spark.createDataset( >>>>>>>> Seq((1L, "a"), (2L, "b"), (3L, "c"), (null, "w"), (null, "x"), >>>>>>>> (null, "y"), (null, "z")))( >>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft") >>>>>>>> >>>>>>>> val leftOuter = baseDf.select( >>>>>>>> $"strleft", when(isnull($"pkLeft"), >>>>>>>> monotonically_increasing_id() + Literal(100)). >>>>>>>> otherwise($"pkLeft").as("pkLeft")) >>>>>>>> leftOuter.show(10000) >>>>>>>> >>>>>>>> val innerRight = spark.createDataset( >>>>>>>> Seq((1L, "11"), (2L, "22"), (3L, "33")))( >>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>> Encoders.STRING)).toDF("pkRight", "strright") >>>>>>>> >>>>>>>> val innerjoin = leftOuter.join(innerRight, $"pkLeft" === >>>>>>>> $"pkRight", "inner") >>>>>>>> >>>>>>>> innerjoin.show(1000) >>>>>>>> >>>>>>>> val outerjoin = leftOuter.join(innerRight, $"pkLeft" === >>>>>>>> $"pkRight", "left_outer") >>>>>>>> >>>>>>>> outerjoin.show(1000) >>>>>>>> } >>>>>>>> } */ >>>>>>>> >>>>>>>> test("with retries") { >>>>>>>> withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { >>>>>>>> withTable("outer", "inner") { >>>>>>>> createBaseTables() >>>>>>>> val outerjoin: DataFrame = getOuterJoinDF >>>>>>>> >>>>>>>> println("Initial data") >>>>>>>> outerjoin.show(1000) >>>>>>>> val correctRows = outerjoin.collect() >>>>>>>> for( i <- 0 until 100) { >>>>>>>> FileScanRDD.throwException = false >>>>>>>> ZippedPartitionsRDD2.throwException = true >>>>>>>> val rowsAfterRetry = getOuterJoinDF.collect() >>>>>>>> import scala.jdk.CollectionConverters._ >>>>>>>> val temp = >>>>>>>> spark.createDataFrame(rowsAfterRetry.toSeq.asJava, outerjoin.schema) >>>>>>>> println("after retry data") >>>>>>>> temp.show(1000) >>>>>>>> assert(correctRows.length == rowsAfterRetry.length) >>>>>>>> val retriedResults = rowsAfterRetry.toBuffer >>>>>>>> correctRows.foreach(r => { >>>>>>>> val index = retriedResults.indexWhere(x => >>>>>>>> r.getString(0) == x.getString(0) && >>>>>>>> (r.getLong(1) == x.getLong(1) || (x.isNullAt(2) && >>>>>>>> r.isNullAt(2) && >>>>>>>> x.isNullAt(3) && r.isNullAt(3))) && >>>>>>>> ((r.isNullAt(2) && x.isNullAt(2)) || r.getLong(2) == >>>>>>>> x.getLong(2)) && >>>>>>>> ((r.isNullAt(3) && x.isNullAt(3)) || r.getString(3) == >>>>>>>> x.getString(3))) >>>>>>>> assert(index >= 0) >>>>>>>> retriedResults.remove(index) >>>>>>>> } >>>>>>>> ) >>>>>>>> assert(retriedResults.isEmpty) >>>>>>>> } >>>>>>>> >>>>>>>> // Thread.sleep(10000000) >>>>>>>> } >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> private def createBaseTables(): Unit = { >>>>>>>> /*val outerDf = spark.createDataset( >>>>>>>> Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, >>>>>>>> "cc"), (null, "cc")))( >>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft") >>>>>>>> outerDf.write.format("parquet").saveAsTable("outer")*/ >>>>>>>> >>>>>>>> val outerDf = spark.createDataset( >>>>>>>> Seq((1L, "aa"), (null, "aa"), (2L, "aa"), (null, "bb"), (3L, >>>>>>>> "bb"), (null, "bb")))( >>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>> Encoders.STRING)).toDF("pkLeftt", "strleft") >>>>>>>> >>>>>>>> outerDf.write.format("parquet").partitionBy("strleft").saveAsTable("outer") >>>>>>>> >>>>>>>> /*val innerDf = spark.createDataset( >>>>>>>> Seq((1L, "11"), (2L, "22"), (3L, "33")))( >>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")*/ >>>>>>>> >>>>>>>> val innerDf = spark.createDataset( >>>>>>>> Seq((1L, "11"), (2L, "11"), (3L, "33")))( >>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>> Encoders.STRING)).toDF("pkRight", "strright") >>>>>>>> >>>>>>>> >>>>>>>> innerDf.write.format("parquet").partitionBy("strright").saveAsTable("inner") >>>>>>>> >>>>>>>> val innerInnerDf = spark.createDataset( >>>>>>>> Seq((1L, "111"), (2L, "222"), (3L, "333")))( >>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>> Encoders.STRING)).toDF("pkpkRight", "strstrright") >>>>>>>> >>>>>>>> // innerInnerDf.write.format("parquet").saveAsTable("innerinner") >>>>>>>> } >>>>>>>> >>>>>>>> private def getOuterJoinDF = { >>>>>>>> val leftOuter = spark.table("outer").select( >>>>>>>> $"strleft", when(isnull($"pkLeftt"), floor(rand() * >>>>>>>> Literal(10000000L)). >>>>>>>> cast(LongType)). >>>>>>>> otherwise($"pkLeftt").as("pkLeft")) >>>>>>>> >>>>>>>> val innerRight = spark.table("inner") >>>>>>>> // val innerinnerRight = spark.table("innerinner") >>>>>>>> >>>>>>>> val outerjoin = leftOuter.hint("shuffle_hash"). >>>>>>>> join(innerRight, $"pkLeft" === $"pkRight", "left_outer") >>>>>>>> outerjoin >>>>>>>> /* >>>>>>>> val outerOuterJoin = outerjoin.hint("shuffle_hash"). >>>>>>>> join(innerinnerRight, $"pkLeft" === $"pkpkRight", "left_outer") >>>>>>>> outerOuterJoin >>>>>>>> >>>>>>>> */ >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> On Sun, Jan 26, 2025 at 10:05 PM Asif Shahid <asif.sha...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Sure. I will send prototypical query tomorrow. Though its difficult >>>>>>>> to simulate issue using unit test , but I think the issue is >>>>>>>> Rdd.isIndeterminate is not returning true for the query. As a >>>>>>>> result, on retry, the shuffle stage is not reattempted fully. >>>>>>>> And rdd is not returning inDeterminate as true , is due to >>>>>>>> ShuffleRdd not taking into account , as the ShuffleDependency is not >>>>>>>> taking >>>>>>>> into account inDeterminate nature of HashPartitioner. >>>>>>>> Moreover the attribute ref provided to HashPartitioner though is >>>>>>>> pointing to an inDeterminate alias, is having deterministic flag as >>>>>>>> true ( >>>>>>>> which in itself is logically correct). >>>>>>>> So I feel apart from ShuffleDependency code change, all expressions >>>>>>>> should have a lazy Boolean say containsIndeterministic component. >>>>>>>> Which >>>>>>>> will be true if the expression is indeterministic or contains any >>>>>>>> attribute >>>>>>>> ref which has is indeterministicComponent true. >>>>>>>> >>>>>>>> And on personal note.. thanks for your interest.. this is very >>>>>>>> rare attitude. >>>>>>>> Regards >>>>>>>> Asif >>>>>>>> >>>>>>>> On Sun, Jan 26, 2025, 9:45 PM Ángel <angel.alvarez.pas...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Asif, >>>>>>>>> >>>>>>>>> Could you provide an example (code+dataset) to analize this? Looks >>>>>>>>> interesting ... >>>>>>>>> >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Ángel >>>>>>>>> >>>>>>>>> El dom, 26 ene 2025 a las 20:58, Asif Shahid (< >>>>>>>>> asif.sha...@gmail.com>) escribió: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> On further thoughts, I concur that leaf expressions like >>>>>>>>>> AttributeRefs can always be considered to be deterministic, as , as >>>>>>>>>> a java >>>>>>>>>> variable the value contained in it per iteration is invariant ( >>>>>>>>>> except when >>>>>>>>>> changed by some deterministic logic). So in that sense what I said >>>>>>>>>> in the >>>>>>>>>> above mail as that an issue is incorrect. >>>>>>>>>> But I think that AttributeRef should have a boolean method which >>>>>>>>>> tells, whether the value it represents is from an indeterminate >>>>>>>>>> source or >>>>>>>>>> not. >>>>>>>>>> Regards >>>>>>>>>> Asif >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Jan 24, 2025 at 5:18 PM Asif Shahid < >>>>>>>>>> asif.sha...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> While testing a use case where the query had an outer join such >>>>>>>>>>> that joining key of left outer table either had a valid value or a >>>>>>>>>>> random >>>>>>>>>>> value( salting to avoid skew). >>>>>>>>>>> The case was reported to have incorrect results in case of node >>>>>>>>>>> failure, with retry. >>>>>>>>>>> On debugging the code, have found following, which has left me >>>>>>>>>>> confused as to what is spark's strategy for indeterministic fields. >>>>>>>>>>> Some serious issues are : >>>>>>>>>>> 1) All the leaf expressions, like AttributeReference are always >>>>>>>>>>> considered deterministic. Which means if an attribute is pointing >>>>>>>>>>> to an >>>>>>>>>>> Alias which itself is indeterministic, the attribute will still be >>>>>>>>>>> considered deterministic >>>>>>>>>>> 2) In CheckAnalysis there is code which checks whether each >>>>>>>>>>> Operator either supports indeterministic value or not . Join is not >>>>>>>>>>> included in the list of supported, but it passes even if the >>>>>>>>>>> joining key is >>>>>>>>>>> pointing to an indeterministic alias. ( When I tried fixing it, >>>>>>>>>>> found a >>>>>>>>>>> plethora of operators failing Like DeserializedObject, >>>>>>>>>>> LocalRelation etc >>>>>>>>>>> which are not supposed to contain indeterministic attributes ( >>>>>>>>>>> because they >>>>>>>>>>> are not in the list of supporting operators). >>>>>>>>>>> 3) The ShuffleDependency does not check for indeterministic >>>>>>>>>>> nature of partitioner ( fixed it locally and then realized that >>>>>>>>>>> there is >>>>>>>>>>> the bug #1 which needs to be fixed too). >>>>>>>>>>> >>>>>>>>>>> The code in DagScheduler / TaskSet, TaskScheduler etc, seems to >>>>>>>>>>> have been written , keeping in mind the indeterministic nature of >>>>>>>>>>> the >>>>>>>>>>> previous and current stages , so as to rexecute previous stages as >>>>>>>>>>> a whole, >>>>>>>>>>> instead of just missing tasks, but the above 3 points, do not seem >>>>>>>>>>> to >>>>>>>>>>> support the code of DagScheduler / TaskScheduler. >>>>>>>>>>> >>>>>>>>>>> Regards >>>>>>>>>>> Asif >>>>>>>>>>> >>>>>>>>>>>