Hi Wenchen,
Apologies... I thought it was Santosh asking for the PR. and race
condition...
Yes the race condition causes only some partitions to be retried ..
May be a checksum can be used as fallback mechanism?
I suppose checksum would be implemented on executor sides..? Then there
might be condition where shuffle files could be lost before
driver/executors are communicated checksum ?
Regards
Asif


On Thu, Feb 13, 2025 at 7:39 PM Asif Shahid <asif.sha...@gmail.com> wrote:

> The bugrepro patch , when applied on current master, will show failure
> with incorrect results.
> While on the PR branch , it will pass.
> The number of iterations in the test is 100.
>
> Regards
> Asif
>
> On Thu, Feb 13, 2025 at 7:35 PM Asif Shahid <asif.sha...@gmail.com> wrote:
>
>> Hi,
>> Following up on this issue.
>> The PR opened is:
>> 1)https://github.com/apache/spark/pull/49708
>> The PR IMO fixes the 3 issues which I have described below.
>> The race condition fix requires use of read write lock.
>>
>> 2) Jira is:
>> https://issues.apache.org/jira/browse/SPARK-51016
>>
>> A checksum approach would result in greater number of retries, I believe:
>> There is already mechanism in Expression to identify indeterminism
>>
>> Anyways, the issues which I think are , multiple, which the opened PR
>> addresses.
>> 1)  The stage.isIndeterminate is buggy as it is not able to identify
>> inDeterminate, and the fix  spans some files like ShuffleDependency, RDD
>> etc.
>> 2) The race condition in DagScheduler:
>>    a) Say a task1 corresponding to Partition P1 of an inDeterminate Stage
>> Result fails with Fetch Failure
>>    b) The code rightly checks if no previous partitions of result stage
>> are successful yet, instead of throwing  Exception by aborting , it decides
>> to retry.
>>     c) During this window of retry, to check for dependencies and making
>> a new attempt initiated by Task1, Task2 corresponding to Partition 2 of
>> result stage becomes successful.
>>    d) The new attempt, now sees instead of all missing partitions, sees n
>> -1.  and  that creates  Data corruption. as it is no longer retrying on all
>> partitions.
>>
>> 3)  The DagSchedulerSuite's test
>>      a)    test("SPARK-25341: retry all the succeeding stages when the
>> map stage is indeterminate")
>>           has following assertion:
>>
>> assert(failedStages.collect {
>>       case stage: ShuffleMapStage if stage.shuffleDep.shuffleId ==
>> shuffleId2 => stage
>>     }.head.findMissingPartitions() == Seq(0))
>>
>> If my understanding is right and as seen from the outcome of the opened
>> PR, as the ShuffleMapSage is turning out to be inDeterminate, all tasks
>> should be retried:
>> so the assertion should be , as expected value Seq(0, 1)
>>
>> b) If  the above #a is valid, then the second test
>>       test("SPARK-32923: handle stage failure for indeterminate map stage
>> with push-based shuffle") {
>> which has assertion:
>>
>>          assert(failedStages.collect {
>>       case stage: ShuffleMapStage if stage.shuffleDep.shuffleId ==
>> shuffleId2 => stage
>>     }.head.findMissingPartitions() == Seq(0))
>>
>> should also be instead,  Seq(0, 1)
>>
>>
>> I am also attaching a patch which I was using a basis to debug and fix.
>> The patch modifies the source code by using inline code interception and
>> tweaks to reproduce the issue of data corruption using a unit test.
>>
>> And I am also attaching the BugTest which I used.
>>
>> The test is supposed to pass  if either it encounters exception from
>> spark layer,  due to exception
>> "Please eliminate the ndeterminacy by checkpointing the RDD before
>> repartition and try again"
>> or if the results of retry are as expected ( 12 rows along witrh data
>> correctness check)
>>
>> Meanwhile I will try to productize the bug test.
>>
>> Would appreciate clarification on Test issue #3. And a review of PR, if
>> done, would be appreciate much more.
>>
>> Sanotsh,
>> I believe my PR would do a full partition retry ( if the Fetch failed
>> happend prior to any other final task being successful, else it will abort
>> query throwing Exception). No wrong result. I believe.
>> Your review will be much appreciated.
>> Regards
>> Asif
>>
>>
>> On Thu, Feb 13, 2025 at 4:54 PM Wenchen Fan <cloud0...@gmail.com> wrote:
>>
>>> > Have identified a race condition , in the current DagSchdeduler code
>>> base
>>>
>>> Does it lead to the failure of full-stage retry? Can you also share your
>>> PR so that we can take a closer look?
>>>
>>> BTW, I'm working with my colleagues on this runtime checksum idea. We
>>> have something similar in our internal infra and I think we can create a
>>> working solution shortly.
>>>
>>> On Thu, Feb 13, 2025 at 12:53 PM Santosh Pingale <
>>> santosh.ping...@adyen.com> wrote:
>>>
>>>> > Maybe we should do it at runtime: if Spark retries a shuffle stage
>>>> but the data becomes different (e.g. use checksum to check it), then Spark
>>>> should retry all the partitions of this stage.
>>>>
>>>> Having bitten hard recently by this behavior in spark and after having
>>>> gone down the rabbit hole to investigate, we should definitely try to
>>>> refine and implement this approach. The non-deterministicness is dependent
>>>> on several factors, including datatypes of the columns or use of
>>>> repartition; it is not easy for the users the reason about this behavior.
>>>>
>>>> PS: The JIRA that I found and helped me confirm my understanding of the
>>>> problem, plus educate internal users and platform team:
>>>> https://issues.apache.org/jira/browse/SPARK-38388. Checksum approach
>>>> was brought up in that JIRA too and I feel that is the balanced way to look
>>>> at this problem.
>>>>
>>>> Kind regards
>>>> Santosh
>>>>
>>>> On Thu, Feb 13, 2025, 12:41 AM Asif Shahid <asif.sha...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> Following up on the issue with some information:
>>>>> 1) have opened a PR related to fix the isInDeterminate method in Stage
>>>>> , RDD etc.
>>>>> 2) After tweaking the source code , have been able to reproduce the
>>>>> data loss issue reliably, but cannot productize the test, as it requires a
>>>>> lot of inline interception and coaxing to induce test failure in a single
>>>>> VM unit test. will post a diff of the source code patch and bug test.
>>>>> 3) Have identified a race condition , in the current
>>>>> DagSchdeduler code base:
>>>>>     In the current DagScheduler code, function
>>>>> *handleTaskCompletion, * when a task fails, while resubmitting the
>>>>> failure as ResubmitFailedStage, prior to submitting, it checks if the
>>>>> shuffle is inDeterminate.   If it is , then stage is submitted ONLY IF the
>>>>> ResultStage has ZERO partitions processed. Which is right,  But it is not 
>>>>> a
>>>>> atomic operation till the point , where missingPartitions from stage is
>>>>> invoked.  As a result it is possible that one of the task from the  
>>>>> current
>>>>> attempt, marks one of the partition as successful, and while the new
>>>>> attempt is made, instead of getting missing partitions as  total
>>>>> partitions,  it fetches only some of them.
>>>>> Will be modifying the PR to fix the race.
>>>>> Regards
>>>>> Asif
>>>>>
>>>>> On Sun, Jan 26, 2025 at 11:19 PM Asif Shahid <asif.sha...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Shouldn't it be possible to determine with static data , if output
>>>>>> will be deterministic ?. Expressions already have deterministic flag. So
>>>>>> when an attribute is created from alias, it will be possible to know if
>>>>>> attribute is pointing to an inDeterminate component.
>>>>>>
>>>>>> On Sun, Jan 26, 2025, 11:09 PM Wenchen Fan <cloud0...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> It looks like a hard problem to statically analyze the query plan
>>>>>>> and decide whether a Spark stage is deterministic or not. When I added
>>>>>>> RDD DeterministicLevel, I thought it was not a hard problem for the 
>>>>>>> callers
>>>>>>> to specify it, but seems I was wrong.
>>>>>>>
>>>>>>> Maybe we should do it at runtime: if Spark retries a shuffle stage
>>>>>>> but the data becomes different (e.g. use checksum to check it), then 
>>>>>>> Spark
>>>>>>> should retry all the partitions of this stage. I'll look into this repro
>>>>>>> after I'm back from the national holiday.
>>>>>>>
>>>>>>> On Mon, Jan 27, 2025 at 2:16 PM Asif Shahid <asif.sha...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> I am using below test.  as a unit test though , it will pass, as
>>>>>>>> to simulate executor lost in a single vm is difficult, but there is
>>>>>>>> definitely a bug.
>>>>>>>> Using debugger if you check, the ShuffleStage.isDeterminate is
>>>>>>>> turning out to be true, though it clearly should not be.
>>>>>>>> As result if you look at DagScheduler, TaskSchedulerImpl and
>>>>>>>> TaskSet code, it will not retry the ShuffleStage fully, instead would 
>>>>>>>> only
>>>>>>>> retry missing task.
>>>>>>>> Which means on retry if the join used a random value which puts it
>>>>>>>> in already completed shuffle task, the results will be missed.
>>>>>>>>
>>>>>>>>> package org.apache.spark.sql.vectorized
>>>>>>>>>
>>>>>>>>> import org.apache.spark.rdd.ZippedPartitionsRDD2
>>>>>>>>> import org.apache.spark.sql.{DataFrame, Encoders, QueryTest}
>>>>>>>>> import org.apache.spark.sql.catalyst.expressions.Literal
>>>>>>>>> import org.apache.spark.sql.execution.datasources.FileScanRDD
>>>>>>>>> import org.apache.spark.sql.functions._
>>>>>>>>> import org.apache.spark.sql.internal.SQLConf
>>>>>>>>> import org.apache.spark.sql.test.SharedSparkSession
>>>>>>>>> import org.apache.spark.sql.types.LongType
>>>>>>>>>
>>>>>>>>> class BugTest extends QueryTest with SharedSparkSession {
>>>>>>>>>   import testImplicits._
>>>>>>>>>  /* test("no retries") {
>>>>>>>>>     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
>>>>>>>>>       val baseDf = spark.createDataset(
>>>>>>>>>         Seq((1L, "a"), (2L, "b"), (3L, "c"), (null, "w"), (null, 
>>>>>>>>> "x"), (null, "y"), (null, "z")))(
>>>>>>>>>         Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft")
>>>>>>>>>
>>>>>>>>>       val leftOuter = baseDf.select(
>>>>>>>>>         $"strleft", when(isnull($"pkLeft"), 
>>>>>>>>> monotonically_increasing_id() + Literal(100)).
>>>>>>>>>           otherwise($"pkLeft").as("pkLeft"))
>>>>>>>>>       leftOuter.show(10000)
>>>>>>>>>
>>>>>>>>>       val innerRight = spark.createDataset(
>>>>>>>>>         Seq((1L, "11"), (2L, "22"), (3L, "33")))(
>>>>>>>>>         Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")
>>>>>>>>>
>>>>>>>>>       val innerjoin = leftOuter.join(innerRight, $"pkLeft" === 
>>>>>>>>> $"pkRight", "inner")
>>>>>>>>>
>>>>>>>>>       innerjoin.show(1000)
>>>>>>>>>
>>>>>>>>>       val outerjoin = leftOuter.join(innerRight, $"pkLeft" === 
>>>>>>>>> $"pkRight", "left_outer")
>>>>>>>>>
>>>>>>>>>       outerjoin.show(1000)
>>>>>>>>>     }
>>>>>>>>>   } */
>>>>>>>>>
>>>>>>>>>   test("with retries") {
>>>>>>>>>     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
>>>>>>>>>       withTable("outer", "inner") {
>>>>>>>>>         createBaseTables()
>>>>>>>>>         val outerjoin: DataFrame = getOuterJoinDF
>>>>>>>>>
>>>>>>>>>         println("Initial data")
>>>>>>>>>         outerjoin.show(1000)
>>>>>>>>>         val correctRows = outerjoin.collect()
>>>>>>>>>         for( i <- 0 until 100) {
>>>>>>>>>          FileScanRDD.throwException = false
>>>>>>>>>           ZippedPartitionsRDD2.throwException = true
>>>>>>>>>           val rowsAfterRetry = getOuterJoinDF.collect()
>>>>>>>>>           import scala.jdk.CollectionConverters._
>>>>>>>>>           val temp = 
>>>>>>>>> spark.createDataFrame(rowsAfterRetry.toSeq.asJava, outerjoin.schema)
>>>>>>>>>           println("after retry data")
>>>>>>>>>           temp.show(1000)
>>>>>>>>>           assert(correctRows.length == rowsAfterRetry.length)
>>>>>>>>>           val retriedResults = rowsAfterRetry.toBuffer
>>>>>>>>>           correctRows.foreach(r => {
>>>>>>>>>             val index = retriedResults.indexWhere(x =>
>>>>>>>>>               r.getString(0) == x.getString(0) &&
>>>>>>>>>                 (r.getLong(1) == x.getLong(1) || (x.isNullAt(2) && 
>>>>>>>>> r.isNullAt(2) &&
>>>>>>>>>                   x.isNullAt(3) && r.isNullAt(3))) &&
>>>>>>>>>                 ((r.isNullAt(2) && x.isNullAt(2)) || r.getLong(2) == 
>>>>>>>>> x.getLong(2)) &&
>>>>>>>>>                 ((r.isNullAt(3) && x.isNullAt(3)) || r.getString(3) 
>>>>>>>>> == x.getString(3)))
>>>>>>>>>             assert(index >= 0)
>>>>>>>>>             retriedResults.remove(index)
>>>>>>>>>           }
>>>>>>>>>           )
>>>>>>>>>           assert(retriedResults.isEmpty)
>>>>>>>>>         }
>>>>>>>>>
>>>>>>>>>      //   Thread.sleep(10000000)
>>>>>>>>>       }
>>>>>>>>>     }
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   private def createBaseTables(): Unit = {
>>>>>>>>>     /*val outerDf = spark.createDataset(
>>>>>>>>>       Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, 
>>>>>>>>> "cc"), (null, "cc")))(
>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft")
>>>>>>>>>     outerDf.write.format("parquet").saveAsTable("outer")*/
>>>>>>>>>
>>>>>>>>>     val outerDf = spark.createDataset(
>>>>>>>>>       Seq((1L, "aa"), (null, "aa"), (2L, "aa"), (null, "bb"), (3L, 
>>>>>>>>> "bb"), (null, "bb")))(
>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>> Encoders.STRING)).toDF("pkLeftt", "strleft")
>>>>>>>>>     
>>>>>>>>> outerDf.write.format("parquet").partitionBy("strleft").saveAsTable("outer")
>>>>>>>>>
>>>>>>>>>     /*val innerDf = spark.createDataset(
>>>>>>>>>       Seq((1L, "11"), (2L, "22"), (3L, "33")))(
>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")*/
>>>>>>>>>
>>>>>>>>>     val innerDf = spark.createDataset(
>>>>>>>>>       Seq((1L, "11"), (2L, "11"), (3L, "33")))(
>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")
>>>>>>>>>
>>>>>>>>>     
>>>>>>>>> innerDf.write.format("parquet").partitionBy("strright").saveAsTable("inner")
>>>>>>>>>
>>>>>>>>>     val innerInnerDf = spark.createDataset(
>>>>>>>>>       Seq((1L, "111"), (2L, "222"), (3L, "333")))(
>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>> Encoders.STRING)).toDF("pkpkRight", "strstrright")
>>>>>>>>>
>>>>>>>>>   //  innerInnerDf.write.format("parquet").saveAsTable("innerinner")
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   private def getOuterJoinDF = {
>>>>>>>>>     val leftOuter = spark.table("outer").select(
>>>>>>>>>       $"strleft", when(isnull($"pkLeftt"), floor(rand() * 
>>>>>>>>> Literal(10000000L)).
>>>>>>>>>         cast(LongType)).
>>>>>>>>>         otherwise($"pkLeftt").as("pkLeft"))
>>>>>>>>>
>>>>>>>>>    val innerRight = spark.table("inner")
>>>>>>>>>  //   val innerinnerRight = spark.table("innerinner")
>>>>>>>>>
>>>>>>>>>     val outerjoin = leftOuter.hint("shuffle_hash").
>>>>>>>>>       join(innerRight, $"pkLeft" === $"pkRight", "left_outer")
>>>>>>>>>     outerjoin
>>>>>>>>>     /*
>>>>>>>>>     val outerOuterJoin = outerjoin.hint("shuffle_hash").
>>>>>>>>>       join(innerinnerRight, $"pkLeft" === $"pkpkRight", "left_outer")
>>>>>>>>>     outerOuterJoin
>>>>>>>>>
>>>>>>>>>      */
>>>>>>>>>   }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>> On Sun, Jan 26, 2025 at 10:05 PM Asif Shahid <asif.sha...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Sure. I will send prototypical query tomorrow. Though its
>>>>>>>>> difficult to simulate issue using unit test , but I think the issue is
>>>>>>>>> Rdd.isIndeterminate is not returning true for the query. As a
>>>>>>>>> result, on retry, the shuffle stage is not reattempted fully.
>>>>>>>>> And rdd is not returning inDeterminate as true , is due to
>>>>>>>>> ShuffleRdd not taking into account , as the ShuffleDependency is not 
>>>>>>>>> taking
>>>>>>>>> into account inDeterminate nature of HashPartitioner.
>>>>>>>>> Moreover the attribute ref provided to HashPartitioner though is
>>>>>>>>> pointing to an inDeterminate alias, is having deterministic flag as 
>>>>>>>>> true (
>>>>>>>>> which in itself is logically correct).
>>>>>>>>> So I feel apart from ShuffleDependency code change, all
>>>>>>>>> expressions should have a lazy Boolean say containsIndeterministic
>>>>>>>>> component. Which will be true if the expression is indeterministic or
>>>>>>>>> contains any attribute ref which has is indeterministicComponent true.
>>>>>>>>>
>>>>>>>>> And on personal note.. thanks for your interest..  this is very
>>>>>>>>> rare attitude.
>>>>>>>>> Regards
>>>>>>>>> Asif
>>>>>>>>>
>>>>>>>>> On Sun, Jan 26, 2025, 9:45 PM Ángel <
>>>>>>>>> angel.alvarez.pas...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Asif,
>>>>>>>>>>
>>>>>>>>>> Could you provide an example (code+dataset) to analize this?
>>>>>>>>>> Looks interesting ...
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Ángel
>>>>>>>>>>
>>>>>>>>>> El dom, 26 ene 2025 a las 20:58, Asif Shahid (<
>>>>>>>>>> asif.sha...@gmail.com>) escribió:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>> On further thoughts, I concur that leaf expressions like
>>>>>>>>>>> AttributeRefs can always be considered to be  deterministic, as , 
>>>>>>>>>>> as a java
>>>>>>>>>>> variable the value contained in it per iteration is invariant ( 
>>>>>>>>>>> except when
>>>>>>>>>>> changed by some deterministic logic). So in that sense what I said 
>>>>>>>>>>> in the
>>>>>>>>>>> above mail as that an issue is incorrect.
>>>>>>>>>>> But I think that AttributeRef should have a boolean method which
>>>>>>>>>>> tells, whether the value it represents is from an indeterminate 
>>>>>>>>>>> source or
>>>>>>>>>>> not.
>>>>>>>>>>> Regards
>>>>>>>>>>> Asif
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jan 24, 2025 at 5:18 PM Asif Shahid <
>>>>>>>>>>> asif.sha...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> While testing a use case where the query had an outer join such
>>>>>>>>>>>> that joining key of left outer table either had a valid value or a 
>>>>>>>>>>>> random
>>>>>>>>>>>> value( salting to avoid skew).
>>>>>>>>>>>> The case was reported to have incorrect results in case of node
>>>>>>>>>>>> failure, with retry.
>>>>>>>>>>>> On debugging the code, have found following, which has left me
>>>>>>>>>>>> confused as to what is spark's strategy for indeterministic fields.
>>>>>>>>>>>> Some serious issues are :
>>>>>>>>>>>> 1) All the leaf expressions, like AttributeReference  are
>>>>>>>>>>>> always considered deterministic. Which means if an attribute is 
>>>>>>>>>>>> pointing to
>>>>>>>>>>>> an Alias which itself is indeterministic,  the attribute will 
>>>>>>>>>>>> still be
>>>>>>>>>>>> considered deterministic
>>>>>>>>>>>> 2) In CheckAnalysis there is code which checks whether each
>>>>>>>>>>>> Operator either supports indeterministic value or not . Join is not
>>>>>>>>>>>> included in the list of supported, but it passes even if the 
>>>>>>>>>>>> joining key is
>>>>>>>>>>>> pointing to an indeterministic alias. ( When I tried fixing it, 
>>>>>>>>>>>> found a
>>>>>>>>>>>> plethora of operators failing Like DeserializedObject, 
>>>>>>>>>>>> LocalRelation etc
>>>>>>>>>>>> which are not supposed to contain indeterministic attributes ( 
>>>>>>>>>>>> because they
>>>>>>>>>>>> are not in the list of supporting operators).
>>>>>>>>>>>> 3) The ShuffleDependency does not check for indeterministic
>>>>>>>>>>>> nature of partitioner ( fixed it locally and then realized that 
>>>>>>>>>>>> there is
>>>>>>>>>>>> the bug #1 which needs to be fixed too).
>>>>>>>>>>>>
>>>>>>>>>>>> The code in DagScheduler / TaskSet, TaskScheduler etc, seems to
>>>>>>>>>>>> have been written , keeping in mind the indeterministic nature of 
>>>>>>>>>>>> the
>>>>>>>>>>>> previous and current stages , so as to rexecute previous stages as 
>>>>>>>>>>>> a whole,
>>>>>>>>>>>> instead of just missing tasks, but the above  3 points, do not 
>>>>>>>>>>>> seem to
>>>>>>>>>>>> support the code of DagScheduler / TaskScheduler.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards
>>>>>>>>>>>> Asif
>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to