> Have identified a race condition , in the current DagSchdeduler code base

Does it lead to the failure of full-stage retry? Can you also share your PR
so that we can take a closer look?

BTW, I'm working with my colleagues on this runtime checksum idea. We have
something similar in our internal infra and I think we can create a working
solution shortly.

On Thu, Feb 13, 2025 at 12:53 PM Santosh Pingale <santosh.ping...@adyen.com>
wrote:

> > Maybe we should do it at runtime: if Spark retries a shuffle stage but
> the data becomes different (e.g. use checksum to check it), then Spark
> should retry all the partitions of this stage.
>
> Having bitten hard recently by this behavior in spark and after having
> gone down the rabbit hole to investigate, we should definitely try to
> refine and implement this approach. The non-deterministicness is dependent
> on several factors, including datatypes of the columns or use of
> repartition; it is not easy for the users the reason about this behavior.
>
> PS: The JIRA that I found and helped me confirm my understanding of the
> problem, plus educate internal users and platform team:
> https://issues.apache.org/jira/browse/SPARK-38388. Checksum approach was
> brought up in that JIRA too and I feel that is the balanced way to look at
> this problem.
>
> Kind regards
> Santosh
>
> On Thu, Feb 13, 2025, 12:41 AM Asif Shahid <asif.sha...@gmail.com> wrote:
>
>> Hi,
>> Following up on the issue with some information:
>> 1) have opened a PR related to fix the isInDeterminate method in Stage ,
>> RDD etc.
>> 2) After tweaking the source code , have been able to reproduce the data
>> loss issue reliably, but cannot productize the test, as it requires a lot
>> of inline interception and coaxing to induce test failure in a single VM
>> unit test. will post a diff of the source code patch and bug test.
>> 3) Have identified a race condition , in the current DagSchdeduler code
>> base:
>>     In the current DagScheduler code, function *handleTaskCompletion, * when
>> a task fails, while resubmitting the failure as ResubmitFailedStage, prior
>> to submitting, it checks if the shuffle is inDeterminate.   If it is , then
>> stage is submitted ONLY IF the ResultStage has ZERO partitions processed.
>> Which is right,  But it is not a atomic operation till the point , where
>> missingPartitions from stage is invoked.  As a result it is possible that
>> one of the task from the  current attempt, marks one of the partition as
>> successful, and while the new attempt is made, instead of getting missing
>> partitions as  total partitions,  it fetches only some of them.
>> Will be modifying the PR to fix the race.
>> Regards
>> Asif
>>
>> On Sun, Jan 26, 2025 at 11:19 PM Asif Shahid <asif.sha...@gmail.com>
>> wrote:
>>
>>> Shouldn't it be possible to determine with static data , if output will
>>> be deterministic ?. Expressions already have deterministic flag. So when an
>>> attribute is created from alias, it will be possible to know if attribute
>>> is pointing to an inDeterminate component.
>>>
>>> On Sun, Jan 26, 2025, 11:09 PM Wenchen Fan <cloud0...@gmail.com> wrote:
>>>
>>>> It looks like a hard problem to statically analyze the query plan and
>>>> decide whether a Spark stage is deterministic or not. When I added
>>>> RDD DeterministicLevel, I thought it was not a hard problem for the callers
>>>> to specify it, but seems I was wrong.
>>>>
>>>> Maybe we should do it at runtime: if Spark retries a shuffle stage but
>>>> the data becomes different (e.g. use checksum to check it), then Spark
>>>> should retry all the partitions of this stage. I'll look into this repro
>>>> after I'm back from the national holiday.
>>>>
>>>> On Mon, Jan 27, 2025 at 2:16 PM Asif Shahid <asif.sha...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> I am using below test.  as a unit test though , it will pass, as  to
>>>>> simulate executor lost in a single vm is difficult, but there is 
>>>>> definitely
>>>>> a bug.
>>>>> Using debugger if you check, the ShuffleStage.isDeterminate is turning
>>>>> out to be true, though it clearly should not be.
>>>>> As result if you look at DagScheduler, TaskSchedulerImpl and TaskSet
>>>>> code, it will not retry the ShuffleStage fully, instead would only retry
>>>>> missing task.
>>>>> Which means on retry if the join used a random value which puts it in
>>>>> already completed shuffle task, the results will be missed.
>>>>>
>>>>>> package org.apache.spark.sql.vectorized
>>>>>>
>>>>>> import org.apache.spark.rdd.ZippedPartitionsRDD2
>>>>>> import org.apache.spark.sql.{DataFrame, Encoders, QueryTest}
>>>>>> import org.apache.spark.sql.catalyst.expressions.Literal
>>>>>> import org.apache.spark.sql.execution.datasources.FileScanRDD
>>>>>> import org.apache.spark.sql.functions._
>>>>>> import org.apache.spark.sql.internal.SQLConf
>>>>>> import org.apache.spark.sql.test.SharedSparkSession
>>>>>> import org.apache.spark.sql.types.LongType
>>>>>>
>>>>>> class BugTest extends QueryTest with SharedSparkSession {
>>>>>>   import testImplicits._
>>>>>>  /* test("no retries") {
>>>>>>     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
>>>>>>       val baseDf = spark.createDataset(
>>>>>>         Seq((1L, "a"), (2L, "b"), (3L, "c"), (null, "w"), (null, "x"), 
>>>>>> (null, "y"), (null, "z")))(
>>>>>>         Encoders.tupleEncoder(Encoders.LONG, 
>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft")
>>>>>>
>>>>>>       val leftOuter = baseDf.select(
>>>>>>         $"strleft", when(isnull($"pkLeft"), 
>>>>>> monotonically_increasing_id() + Literal(100)).
>>>>>>           otherwise($"pkLeft").as("pkLeft"))
>>>>>>       leftOuter.show(10000)
>>>>>>
>>>>>>       val innerRight = spark.createDataset(
>>>>>>         Seq((1L, "11"), (2L, "22"), (3L, "33")))(
>>>>>>         Encoders.tupleEncoder(Encoders.LONG, 
>>>>>> Encoders.STRING)).toDF("pkRight", "strright")
>>>>>>
>>>>>>       val innerjoin = leftOuter.join(innerRight, $"pkLeft" === 
>>>>>> $"pkRight", "inner")
>>>>>>
>>>>>>       innerjoin.show(1000)
>>>>>>
>>>>>>       val outerjoin = leftOuter.join(innerRight, $"pkLeft" === 
>>>>>> $"pkRight", "left_outer")
>>>>>>
>>>>>>       outerjoin.show(1000)
>>>>>>     }
>>>>>>   } */
>>>>>>
>>>>>>   test("with retries") {
>>>>>>     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
>>>>>>       withTable("outer", "inner") {
>>>>>>         createBaseTables()
>>>>>>         val outerjoin: DataFrame = getOuterJoinDF
>>>>>>
>>>>>>         println("Initial data")
>>>>>>         outerjoin.show(1000)
>>>>>>         val correctRows = outerjoin.collect()
>>>>>>         for( i <- 0 until 100) {
>>>>>>          FileScanRDD.throwException = false
>>>>>>           ZippedPartitionsRDD2.throwException = true
>>>>>>           val rowsAfterRetry = getOuterJoinDF.collect()
>>>>>>           import scala.jdk.CollectionConverters._
>>>>>>           val temp = spark.createDataFrame(rowsAfterRetry.toSeq.asJava, 
>>>>>> outerjoin.schema)
>>>>>>           println("after retry data")
>>>>>>           temp.show(1000)
>>>>>>           assert(correctRows.length == rowsAfterRetry.length)
>>>>>>           val retriedResults = rowsAfterRetry.toBuffer
>>>>>>           correctRows.foreach(r => {
>>>>>>             val index = retriedResults.indexWhere(x =>
>>>>>>               r.getString(0) == x.getString(0) &&
>>>>>>                 (r.getLong(1) == x.getLong(1) || (x.isNullAt(2) && 
>>>>>> r.isNullAt(2) &&
>>>>>>                   x.isNullAt(3) && r.isNullAt(3))) &&
>>>>>>                 ((r.isNullAt(2) && x.isNullAt(2)) || r.getLong(2) == 
>>>>>> x.getLong(2)) &&
>>>>>>                 ((r.isNullAt(3) && x.isNullAt(3)) || r.getString(3) == 
>>>>>> x.getString(3)))
>>>>>>             assert(index >= 0)
>>>>>>             retriedResults.remove(index)
>>>>>>           }
>>>>>>           )
>>>>>>           assert(retriedResults.isEmpty)
>>>>>>         }
>>>>>>
>>>>>>      //   Thread.sleep(10000000)
>>>>>>       }
>>>>>>     }
>>>>>>   }
>>>>>>
>>>>>>   private def createBaseTables(): Unit = {
>>>>>>     /*val outerDf = spark.createDataset(
>>>>>>       Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, 
>>>>>> "cc"), (null, "cc")))(
>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft")
>>>>>>     outerDf.write.format("parquet").saveAsTable("outer")*/
>>>>>>
>>>>>>     val outerDf = spark.createDataset(
>>>>>>       Seq((1L, "aa"), (null, "aa"), (2L, "aa"), (null, "bb"), (3L, 
>>>>>> "bb"), (null, "bb")))(
>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>> Encoders.STRING)).toDF("pkLeftt", "strleft")
>>>>>>     
>>>>>> outerDf.write.format("parquet").partitionBy("strleft").saveAsTable("outer")
>>>>>>
>>>>>>     /*val innerDf = spark.createDataset(
>>>>>>       Seq((1L, "11"), (2L, "22"), (3L, "33")))(
>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>> Encoders.STRING)).toDF("pkRight", "strright")*/
>>>>>>
>>>>>>     val innerDf = spark.createDataset(
>>>>>>       Seq((1L, "11"), (2L, "11"), (3L, "33")))(
>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>> Encoders.STRING)).toDF("pkRight", "strright")
>>>>>>
>>>>>>     
>>>>>> innerDf.write.format("parquet").partitionBy("strright").saveAsTable("inner")
>>>>>>
>>>>>>     val innerInnerDf = spark.createDataset(
>>>>>>       Seq((1L, "111"), (2L, "222"), (3L, "333")))(
>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>> Encoders.STRING)).toDF("pkpkRight", "strstrright")
>>>>>>
>>>>>>   //  innerInnerDf.write.format("parquet").saveAsTable("innerinner")
>>>>>>   }
>>>>>>
>>>>>>   private def getOuterJoinDF = {
>>>>>>     val leftOuter = spark.table("outer").select(
>>>>>>       $"strleft", when(isnull($"pkLeftt"), floor(rand() * 
>>>>>> Literal(10000000L)).
>>>>>>         cast(LongType)).
>>>>>>         otherwise($"pkLeftt").as("pkLeft"))
>>>>>>
>>>>>>    val innerRight = spark.table("inner")
>>>>>>  //   val innerinnerRight = spark.table("innerinner")
>>>>>>
>>>>>>     val outerjoin = leftOuter.hint("shuffle_hash").
>>>>>>       join(innerRight, $"pkLeft" === $"pkRight", "left_outer")
>>>>>>     outerjoin
>>>>>>     /*
>>>>>>     val outerOuterJoin = outerjoin.hint("shuffle_hash").
>>>>>>       join(innerinnerRight, $"pkLeft" === $"pkpkRight", "left_outer")
>>>>>>     outerOuterJoin
>>>>>>
>>>>>>      */
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>> On Sun, Jan 26, 2025 at 10:05 PM Asif Shahid <asif.sha...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Sure. I will send prototypical query tomorrow. Though its difficult
>>>>>> to simulate issue using unit test , but I think the issue is
>>>>>> Rdd.isIndeterminate is not returning true for the query. As a result,
>>>>>> on retry, the shuffle stage is not reattempted fully.
>>>>>> And rdd is not returning inDeterminate as true , is due to ShuffleRdd
>>>>>> not taking into account , as the ShuffleDependency is not taking into
>>>>>> account inDeterminate nature of HashPartitioner.
>>>>>> Moreover the attribute ref provided to HashPartitioner though is
>>>>>> pointing to an inDeterminate alias, is having deterministic flag as true 
>>>>>> (
>>>>>> which in itself is logically correct).
>>>>>> So I feel apart from ShuffleDependency code change, all expressions
>>>>>> should have a lazy Boolean say containsIndeterministic  component. Which
>>>>>> will be true if the expression is indeterministic or contains any 
>>>>>> attribute
>>>>>> ref which has is indeterministicComponent true.
>>>>>>
>>>>>> And on personal note.. thanks for your interest..  this is very rare
>>>>>> attitude.
>>>>>> Regards
>>>>>> Asif
>>>>>>
>>>>>> On Sun, Jan 26, 2025, 9:45 PM Ángel <angel.alvarez.pas...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Asif,
>>>>>>>
>>>>>>> Could you provide an example (code+dataset) to analize this? Looks
>>>>>>> interesting ...
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Ángel
>>>>>>>
>>>>>>> El dom, 26 ene 2025 a las 20:58, Asif Shahid (<asif.sha...@gmail.com>)
>>>>>>> escribió:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> On further thoughts, I concur that leaf expressions like
>>>>>>>> AttributeRefs can always be considered to be  deterministic, as , as a 
>>>>>>>> java
>>>>>>>> variable the value contained in it per iteration is invariant ( except 
>>>>>>>> when
>>>>>>>> changed by some deterministic logic). So in that sense what I said in 
>>>>>>>> the
>>>>>>>> above mail as that an issue is incorrect.
>>>>>>>> But I think that AttributeRef should have a boolean method which
>>>>>>>> tells, whether the value it represents is from an indeterminate source 
>>>>>>>> or
>>>>>>>> not.
>>>>>>>> Regards
>>>>>>>> Asif
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jan 24, 2025 at 5:18 PM Asif Shahid <asif.sha...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> While testing a use case where the query had an outer join such
>>>>>>>>> that joining key of left outer table either had a valid value or a 
>>>>>>>>> random
>>>>>>>>> value( salting to avoid skew).
>>>>>>>>> The case was reported to have incorrect results in case of node
>>>>>>>>> failure, with retry.
>>>>>>>>> On debugging the code, have found following, which has left me
>>>>>>>>> confused as to what is spark's strategy for indeterministic fields.
>>>>>>>>> Some serious issues are :
>>>>>>>>> 1) All the leaf expressions, like AttributeReference  are always
>>>>>>>>> considered deterministic. Which means if an attribute is pointing to 
>>>>>>>>> an
>>>>>>>>> Alias which itself is indeterministic,  the attribute will still be
>>>>>>>>> considered deterministic
>>>>>>>>> 2) In CheckAnalysis there is code which checks whether each
>>>>>>>>> Operator either supports indeterministic value or not . Join is not
>>>>>>>>> included in the list of supported, but it passes even if the joining 
>>>>>>>>> key is
>>>>>>>>> pointing to an indeterministic alias. ( When I tried fixing it, found 
>>>>>>>>> a
>>>>>>>>> plethora of operators failing Like DeserializedObject, LocalRelation 
>>>>>>>>> etc
>>>>>>>>> which are not supposed to contain indeterministic attributes ( 
>>>>>>>>> because they
>>>>>>>>> are not in the list of supporting operators).
>>>>>>>>> 3) The ShuffleDependency does not check for indeterministic nature
>>>>>>>>> of partitioner ( fixed it locally and then realized that there is the 
>>>>>>>>> bug
>>>>>>>>> #1 which needs to be fixed too).
>>>>>>>>>
>>>>>>>>> The code in DagScheduler / TaskSet, TaskScheduler etc, seems to
>>>>>>>>> have been written , keeping in mind the indeterministic nature of the
>>>>>>>>> previous and current stages , so as to rexecute previous stages as a 
>>>>>>>>> whole,
>>>>>>>>> instead of just missing tasks, but the above  3 points, do not seem to
>>>>>>>>> support the code of DagScheduler / TaskScheduler.
>>>>>>>>>
>>>>>>>>> Regards
>>>>>>>>> Asif
>>>>>>>>>
>>>>>>>>>

Reply via email to