The bugrepro patch , when applied on current master, will show failure with
incorrect results.
While on the PR branch , it will pass.
The number of iterations in the test is 100.

Regards
Asif

On Thu, Feb 13, 2025 at 7:35 PM Asif Shahid <asif.sha...@gmail.com> wrote:

> Hi,
> Following up on this issue.
> The PR opened is:
> 1)https://github.com/apache/spark/pull/49708
> The PR IMO fixes the 3 issues which I have described below.
> The race condition fix requires use of read write lock.
>
> 2) Jira is:
> https://issues.apache.org/jira/browse/SPARK-51016
>
> A checksum approach would result in greater number of retries, I believe:
> There is already mechanism in Expression to identify indeterminism
>
> Anyways, the issues which I think are , multiple, which the opened PR
> addresses.
> 1)  The stage.isIndeterminate is buggy as it is not able to identify
> inDeterminate, and the fix  spans some files like ShuffleDependency, RDD
> etc.
> 2) The race condition in DagScheduler:
>    a) Say a task1 corresponding to Partition P1 of an inDeterminate Stage
> Result fails with Fetch Failure
>    b) The code rightly checks if no previous partitions of result stage
> are successful yet, instead of throwing  Exception by aborting , it decides
> to retry.
>     c) During this window of retry, to check for dependencies and making a
> new attempt initiated by Task1, Task2 corresponding to Partition 2 of
> result stage becomes successful.
>    d) The new attempt, now sees instead of all missing partitions, sees n
> -1.  and  that creates  Data corruption. as it is no longer retrying on all
> partitions.
>
> 3)  The DagSchedulerSuite's test
>      a)    test("SPARK-25341: retry all the succeeding stages when the map
> stage is indeterminate")
>           has following assertion:
>
> assert(failedStages.collect {
>       case stage: ShuffleMapStage if stage.shuffleDep.shuffleId ==
> shuffleId2 => stage
>     }.head.findMissingPartitions() == Seq(0))
>
> If my understanding is right and as seen from the outcome of the opened
> PR, as the ShuffleMapSage is turning out to be inDeterminate, all tasks
> should be retried:
> so the assertion should be , as expected value Seq(0, 1)
>
> b) If  the above #a is valid, then the second test
>       test("SPARK-32923: handle stage failure for indeterminate map stage
> with push-based shuffle") {
> which has assertion:
>
>          assert(failedStages.collect {
>       case stage: ShuffleMapStage if stage.shuffleDep.shuffleId ==
> shuffleId2 => stage
>     }.head.findMissingPartitions() == Seq(0))
>
> should also be instead,  Seq(0, 1)
>
>
> I am also attaching a patch which I was using a basis to debug and fix.
> The patch modifies the source code by using inline code interception and
> tweaks to reproduce the issue of data corruption using a unit test.
>
> And I am also attaching the BugTest which I used.
>
> The test is supposed to pass  if either it encounters exception from spark
> layer,  due to exception
> "Please eliminate the ndeterminacy by checkpointing the RDD before
> repartition and try again"
> or if the results of retry are as expected ( 12 rows along witrh data
> correctness check)
>
> Meanwhile I will try to productize the bug test.
>
> Would appreciate clarification on Test issue #3. And a review of PR, if
> done, would be appreciate much more.
>
> Sanotsh,
> I believe my PR would do a full partition retry ( if the Fetch failed
> happend prior to any other final task being successful, else it will abort
> query throwing Exception). No wrong result. I believe.
> Your review will be much appreciated.
> Regards
> Asif
>
>
> On Thu, Feb 13, 2025 at 4:54 PM Wenchen Fan <cloud0...@gmail.com> wrote:
>
>> > Have identified a race condition , in the current DagSchdeduler code
>> base
>>
>> Does it lead to the failure of full-stage retry? Can you also share your
>> PR so that we can take a closer look?
>>
>> BTW, I'm working with my colleagues on this runtime checksum idea. We
>> have something similar in our internal infra and I think we can create a
>> working solution shortly.
>>
>> On Thu, Feb 13, 2025 at 12:53 PM Santosh Pingale <
>> santosh.ping...@adyen.com> wrote:
>>
>>> > Maybe we should do it at runtime: if Spark retries a shuffle stage but
>>> the data becomes different (e.g. use checksum to check it), then Spark
>>> should retry all the partitions of this stage.
>>>
>>> Having bitten hard recently by this behavior in spark and after having
>>> gone down the rabbit hole to investigate, we should definitely try to
>>> refine and implement this approach. The non-deterministicness is dependent
>>> on several factors, including datatypes of the columns or use of
>>> repartition; it is not easy for the users the reason about this behavior.
>>>
>>> PS: The JIRA that I found and helped me confirm my understanding of the
>>> problem, plus educate internal users and platform team:
>>> https://issues.apache.org/jira/browse/SPARK-38388. Checksum approach
>>> was brought up in that JIRA too and I feel that is the balanced way to look
>>> at this problem.
>>>
>>> Kind regards
>>> Santosh
>>>
>>> On Thu, Feb 13, 2025, 12:41 AM Asif Shahid <asif.sha...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> Following up on the issue with some information:
>>>> 1) have opened a PR related to fix the isInDeterminate method in Stage
>>>> , RDD etc.
>>>> 2) After tweaking the source code , have been able to reproduce the
>>>> data loss issue reliably, but cannot productize the test, as it requires a
>>>> lot of inline interception and coaxing to induce test failure in a single
>>>> VM unit test. will post a diff of the source code patch and bug test.
>>>> 3) Have identified a race condition , in the current DagSchdeduler code
>>>> base:
>>>>     In the current DagScheduler code, function *handleTaskCompletion, * 
>>>> when
>>>> a task fails, while resubmitting the failure as ResubmitFailedStage, prior
>>>> to submitting, it checks if the shuffle is inDeterminate.   If it is , then
>>>> stage is submitted ONLY IF the ResultStage has ZERO partitions processed.
>>>> Which is right,  But it is not a atomic operation till the point , where
>>>> missingPartitions from stage is invoked.  As a result it is possible that
>>>> one of the task from the  current attempt, marks one of the partition as
>>>> successful, and while the new attempt is made, instead of getting missing
>>>> partitions as  total partitions,  it fetches only some of them.
>>>> Will be modifying the PR to fix the race.
>>>> Regards
>>>> Asif
>>>>
>>>> On Sun, Jan 26, 2025 at 11:19 PM Asif Shahid <asif.sha...@gmail.com>
>>>> wrote:
>>>>
>>>>> Shouldn't it be possible to determine with static data , if output
>>>>> will be deterministic ?. Expressions already have deterministic flag. So
>>>>> when an attribute is created from alias, it will be possible to know if
>>>>> attribute is pointing to an inDeterminate component.
>>>>>
>>>>> On Sun, Jan 26, 2025, 11:09 PM Wenchen Fan <cloud0...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> It looks like a hard problem to statically analyze the query plan and
>>>>>> decide whether a Spark stage is deterministic or not. When I added
>>>>>> RDD DeterministicLevel, I thought it was not a hard problem for the 
>>>>>> callers
>>>>>> to specify it, but seems I was wrong.
>>>>>>
>>>>>> Maybe we should do it at runtime: if Spark retries a shuffle stage
>>>>>> but the data becomes different (e.g. use checksum to check it), then 
>>>>>> Spark
>>>>>> should retry all the partitions of this stage. I'll look into this repro
>>>>>> after I'm back from the national holiday.
>>>>>>
>>>>>> On Mon, Jan 27, 2025 at 2:16 PM Asif Shahid <asif.sha...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> I am using below test.  as a unit test though , it will pass, as  to
>>>>>>> simulate executor lost in a single vm is difficult, but there is 
>>>>>>> definitely
>>>>>>> a bug.
>>>>>>> Using debugger if you check, the ShuffleStage.isDeterminate is
>>>>>>> turning out to be true, though it clearly should not be.
>>>>>>> As result if you look at DagScheduler, TaskSchedulerImpl and TaskSet
>>>>>>> code, it will not retry the ShuffleStage fully, instead would only retry
>>>>>>> missing task.
>>>>>>> Which means on retry if the join used a random value which puts it
>>>>>>> in already completed shuffle task, the results will be missed.
>>>>>>>
>>>>>>>> package org.apache.spark.sql.vectorized
>>>>>>>>
>>>>>>>> import org.apache.spark.rdd.ZippedPartitionsRDD2
>>>>>>>> import org.apache.spark.sql.{DataFrame, Encoders, QueryTest}
>>>>>>>> import org.apache.spark.sql.catalyst.expressions.Literal
>>>>>>>> import org.apache.spark.sql.execution.datasources.FileScanRDD
>>>>>>>> import org.apache.spark.sql.functions._
>>>>>>>> import org.apache.spark.sql.internal.SQLConf
>>>>>>>> import org.apache.spark.sql.test.SharedSparkSession
>>>>>>>> import org.apache.spark.sql.types.LongType
>>>>>>>>
>>>>>>>> class BugTest extends QueryTest with SharedSparkSession {
>>>>>>>>   import testImplicits._
>>>>>>>>  /* test("no retries") {
>>>>>>>>     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
>>>>>>>>       val baseDf = spark.createDataset(
>>>>>>>>         Seq((1L, "a"), (2L, "b"), (3L, "c"), (null, "w"), (null, "x"), 
>>>>>>>> (null, "y"), (null, "z")))(
>>>>>>>>         Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft")
>>>>>>>>
>>>>>>>>       val leftOuter = baseDf.select(
>>>>>>>>         $"strleft", when(isnull($"pkLeft"), 
>>>>>>>> monotonically_increasing_id() + Literal(100)).
>>>>>>>>           otherwise($"pkLeft").as("pkLeft"))
>>>>>>>>       leftOuter.show(10000)
>>>>>>>>
>>>>>>>>       val innerRight = spark.createDataset(
>>>>>>>>         Seq((1L, "11"), (2L, "22"), (3L, "33")))(
>>>>>>>>         Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")
>>>>>>>>
>>>>>>>>       val innerjoin = leftOuter.join(innerRight, $"pkLeft" === 
>>>>>>>> $"pkRight", "inner")
>>>>>>>>
>>>>>>>>       innerjoin.show(1000)
>>>>>>>>
>>>>>>>>       val outerjoin = leftOuter.join(innerRight, $"pkLeft" === 
>>>>>>>> $"pkRight", "left_outer")
>>>>>>>>
>>>>>>>>       outerjoin.show(1000)
>>>>>>>>     }
>>>>>>>>   } */
>>>>>>>>
>>>>>>>>   test("with retries") {
>>>>>>>>     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
>>>>>>>>       withTable("outer", "inner") {
>>>>>>>>         createBaseTables()
>>>>>>>>         val outerjoin: DataFrame = getOuterJoinDF
>>>>>>>>
>>>>>>>>         println("Initial data")
>>>>>>>>         outerjoin.show(1000)
>>>>>>>>         val correctRows = outerjoin.collect()
>>>>>>>>         for( i <- 0 until 100) {
>>>>>>>>          FileScanRDD.throwException = false
>>>>>>>>           ZippedPartitionsRDD2.throwException = true
>>>>>>>>           val rowsAfterRetry = getOuterJoinDF.collect()
>>>>>>>>           import scala.jdk.CollectionConverters._
>>>>>>>>           val temp = 
>>>>>>>> spark.createDataFrame(rowsAfterRetry.toSeq.asJava, outerjoin.schema)
>>>>>>>>           println("after retry data")
>>>>>>>>           temp.show(1000)
>>>>>>>>           assert(correctRows.length == rowsAfterRetry.length)
>>>>>>>>           val retriedResults = rowsAfterRetry.toBuffer
>>>>>>>>           correctRows.foreach(r => {
>>>>>>>>             val index = retriedResults.indexWhere(x =>
>>>>>>>>               r.getString(0) == x.getString(0) &&
>>>>>>>>                 (r.getLong(1) == x.getLong(1) || (x.isNullAt(2) && 
>>>>>>>> r.isNullAt(2) &&
>>>>>>>>                   x.isNullAt(3) && r.isNullAt(3))) &&
>>>>>>>>                 ((r.isNullAt(2) && x.isNullAt(2)) || r.getLong(2) == 
>>>>>>>> x.getLong(2)) &&
>>>>>>>>                 ((r.isNullAt(3) && x.isNullAt(3)) || r.getString(3) == 
>>>>>>>> x.getString(3)))
>>>>>>>>             assert(index >= 0)
>>>>>>>>             retriedResults.remove(index)
>>>>>>>>           }
>>>>>>>>           )
>>>>>>>>           assert(retriedResults.isEmpty)
>>>>>>>>         }
>>>>>>>>
>>>>>>>>      //   Thread.sleep(10000000)
>>>>>>>>       }
>>>>>>>>     }
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   private def createBaseTables(): Unit = {
>>>>>>>>     /*val outerDf = spark.createDataset(
>>>>>>>>       Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, 
>>>>>>>> "cc"), (null, "cc")))(
>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft")
>>>>>>>>     outerDf.write.format("parquet").saveAsTable("outer")*/
>>>>>>>>
>>>>>>>>     val outerDf = spark.createDataset(
>>>>>>>>       Seq((1L, "aa"), (null, "aa"), (2L, "aa"), (null, "bb"), (3L, 
>>>>>>>> "bb"), (null, "bb")))(
>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>> Encoders.STRING)).toDF("pkLeftt", "strleft")
>>>>>>>>     
>>>>>>>> outerDf.write.format("parquet").partitionBy("strleft").saveAsTable("outer")
>>>>>>>>
>>>>>>>>     /*val innerDf = spark.createDataset(
>>>>>>>>       Seq((1L, "11"), (2L, "22"), (3L, "33")))(
>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")*/
>>>>>>>>
>>>>>>>>     val innerDf = spark.createDataset(
>>>>>>>>       Seq((1L, "11"), (2L, "11"), (3L, "33")))(
>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")
>>>>>>>>
>>>>>>>>     
>>>>>>>> innerDf.write.format("parquet").partitionBy("strright").saveAsTable("inner")
>>>>>>>>
>>>>>>>>     val innerInnerDf = spark.createDataset(
>>>>>>>>       Seq((1L, "111"), (2L, "222"), (3L, "333")))(
>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>> Encoders.STRING)).toDF("pkpkRight", "strstrright")
>>>>>>>>
>>>>>>>>   //  innerInnerDf.write.format("parquet").saveAsTable("innerinner")
>>>>>>>>   }
>>>>>>>>
>>>>>>>>   private def getOuterJoinDF = {
>>>>>>>>     val leftOuter = spark.table("outer").select(
>>>>>>>>       $"strleft", when(isnull($"pkLeftt"), floor(rand() * 
>>>>>>>> Literal(10000000L)).
>>>>>>>>         cast(LongType)).
>>>>>>>>         otherwise($"pkLeftt").as("pkLeft"))
>>>>>>>>
>>>>>>>>    val innerRight = spark.table("inner")
>>>>>>>>  //   val innerinnerRight = spark.table("innerinner")
>>>>>>>>
>>>>>>>>     val outerjoin = leftOuter.hint("shuffle_hash").
>>>>>>>>       join(innerRight, $"pkLeft" === $"pkRight", "left_outer")
>>>>>>>>     outerjoin
>>>>>>>>     /*
>>>>>>>>     val outerOuterJoin = outerjoin.hint("shuffle_hash").
>>>>>>>>       join(innerinnerRight, $"pkLeft" === $"pkpkRight", "left_outer")
>>>>>>>>     outerOuterJoin
>>>>>>>>
>>>>>>>>      */
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>> On Sun, Jan 26, 2025 at 10:05 PM Asif Shahid <asif.sha...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Sure. I will send prototypical query tomorrow. Though its difficult
>>>>>>>> to simulate issue using unit test , but I think the issue is
>>>>>>>> Rdd.isIndeterminate is not returning true for the query. As a
>>>>>>>> result, on retry, the shuffle stage is not reattempted fully.
>>>>>>>> And rdd is not returning inDeterminate as true , is due to
>>>>>>>> ShuffleRdd not taking into account , as the ShuffleDependency is not 
>>>>>>>> taking
>>>>>>>> into account inDeterminate nature of HashPartitioner.
>>>>>>>> Moreover the attribute ref provided to HashPartitioner though is
>>>>>>>> pointing to an inDeterminate alias, is having deterministic flag as 
>>>>>>>> true (
>>>>>>>> which in itself is logically correct).
>>>>>>>> So I feel apart from ShuffleDependency code change, all expressions
>>>>>>>> should have a lazy Boolean say containsIndeterministic  component. 
>>>>>>>> Which
>>>>>>>> will be true if the expression is indeterministic or contains any 
>>>>>>>> attribute
>>>>>>>> ref which has is indeterministicComponent true.
>>>>>>>>
>>>>>>>> And on personal note.. thanks for your interest..  this is very
>>>>>>>> rare attitude.
>>>>>>>> Regards
>>>>>>>> Asif
>>>>>>>>
>>>>>>>> On Sun, Jan 26, 2025, 9:45 PM Ángel <angel.alvarez.pas...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Asif,
>>>>>>>>>
>>>>>>>>> Could you provide an example (code+dataset) to analize this? Looks
>>>>>>>>> interesting ...
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Ángel
>>>>>>>>>
>>>>>>>>> El dom, 26 ene 2025 a las 20:58, Asif Shahid (<
>>>>>>>>> asif.sha...@gmail.com>) escribió:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> On further thoughts, I concur that leaf expressions like
>>>>>>>>>> AttributeRefs can always be considered to be  deterministic, as , as 
>>>>>>>>>> a java
>>>>>>>>>> variable the value contained in it per iteration is invariant ( 
>>>>>>>>>> except when
>>>>>>>>>> changed by some deterministic logic). So in that sense what I said 
>>>>>>>>>> in the
>>>>>>>>>> above mail as that an issue is incorrect.
>>>>>>>>>> But I think that AttributeRef should have a boolean method which
>>>>>>>>>> tells, whether the value it represents is from an indeterminate 
>>>>>>>>>> source or
>>>>>>>>>> not.
>>>>>>>>>> Regards
>>>>>>>>>> Asif
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 24, 2025 at 5:18 PM Asif Shahid <
>>>>>>>>>> asif.sha...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>> While testing a use case where the query had an outer join such
>>>>>>>>>>> that joining key of left outer table either had a valid value or a 
>>>>>>>>>>> random
>>>>>>>>>>> value( salting to avoid skew).
>>>>>>>>>>> The case was reported to have incorrect results in case of node
>>>>>>>>>>> failure, with retry.
>>>>>>>>>>> On debugging the code, have found following, which has left me
>>>>>>>>>>> confused as to what is spark's strategy for indeterministic fields.
>>>>>>>>>>> Some serious issues are :
>>>>>>>>>>> 1) All the leaf expressions, like AttributeReference  are always
>>>>>>>>>>> considered deterministic. Which means if an attribute is pointing 
>>>>>>>>>>> to an
>>>>>>>>>>> Alias which itself is indeterministic,  the attribute will still be
>>>>>>>>>>> considered deterministic
>>>>>>>>>>> 2) In CheckAnalysis there is code which checks whether each
>>>>>>>>>>> Operator either supports indeterministic value or not . Join is not
>>>>>>>>>>> included in the list of supported, but it passes even if the 
>>>>>>>>>>> joining key is
>>>>>>>>>>> pointing to an indeterministic alias. ( When I tried fixing it, 
>>>>>>>>>>> found a
>>>>>>>>>>> plethora of operators failing Like DeserializedObject, 
>>>>>>>>>>> LocalRelation etc
>>>>>>>>>>> which are not supposed to contain indeterministic attributes ( 
>>>>>>>>>>> because they
>>>>>>>>>>> are not in the list of supporting operators).
>>>>>>>>>>> 3) The ShuffleDependency does not check for indeterministic
>>>>>>>>>>> nature of partitioner ( fixed it locally and then realized that 
>>>>>>>>>>> there is
>>>>>>>>>>> the bug #1 which needs to be fixed too).
>>>>>>>>>>>
>>>>>>>>>>> The code in DagScheduler / TaskSet, TaskScheduler etc, seems to
>>>>>>>>>>> have been written , keeping in mind the indeterministic nature of 
>>>>>>>>>>> the
>>>>>>>>>>> previous and current stages , so as to rexecute previous stages as 
>>>>>>>>>>> a whole,
>>>>>>>>>>> instead of just missing tasks, but the above  3 points, do not seem 
>>>>>>>>>>> to
>>>>>>>>>>> support the code of DagScheduler / TaskScheduler.
>>>>>>>>>>>
>>>>>>>>>>> Regards
>>>>>>>>>>> Asif
>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to