Hi,
I have splitted my PR  for inDeterministic  Stage bug into 2 :
The first one is for jira :
https://issues.apache.org/jira/browse/SPARK-51016
The PR is:
https://github.com/apache/spark/pull/50029

The above deals exclusively for the issue of incorrect result of
Stage.isInDeterminate function.

The second jira is :
https://issues.apache.org/jira/browse/SPARK-51272
The PR is :
https://github.com/apache/spark/pull/50033
This deals with the race condition where some partitions may not get
retried.
There is a new unit test in DagSchedulerSuite which reproduces the race
reliably.

For end to end functional test , the bugrepro,patch  and bugTest attached ,
can be used, but cannot productize it due to nature of the code.
The bugreprod.patch with BugTest will pass, if both the above PRs are
included.

Regards
Asif


On Sun, Feb 16, 2025 at 9:43 AM Asif Shahid <asif.sha...@gmail.com> wrote:

> Hi.
> Ok . did the final checkin. Pls feel free to review.
> Regards
> Asif
>
>
> On Sat, Feb 15, 2025 at 6:42 PM Asif Shahid <asif.sha...@gmail.com> wrote:
>
>> Pls hold on reviewing the patch, as I need to do one more checkin.
>> I have still left a window of race , by releasing the read lock early,
>> for the  case of first task being successful,  and the failing task has yet
>> to acquire the write lock.
>> Regards
>> Asif
>>
>> On Thu, Feb 13, 2025 at 10:48 PM Asif Shahid <asif.sha...@gmail.com>
>> wrote:
>>
>>> Hi Wenchen,
>>> Apologies... I thought it was Santosh asking for the PR. and race
>>> condition...
>>> Yes the race condition causes only some partitions to be retried ..
>>> May be a checksum can be used as fallback mechanism?
>>> I suppose checksum would be implemented on executor sides..? Then there
>>> might be condition where shuffle files could be lost before
>>> driver/executors are communicated checksum ?
>>> Regards
>>> Asif
>>>
>>>
>>> On Thu, Feb 13, 2025 at 7:39 PM Asif Shahid <asif.sha...@gmail.com>
>>> wrote:
>>>
>>>> The bugrepro patch , when applied on current master, will show failure
>>>> with incorrect results.
>>>> While on the PR branch , it will pass.
>>>> The number of iterations in the test is 100.
>>>>
>>>> Regards
>>>> Asif
>>>>
>>>> On Thu, Feb 13, 2025 at 7:35 PM Asif Shahid <asif.sha...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> Following up on this issue.
>>>>> The PR opened is:
>>>>> 1)https://github.com/apache/spark/pull/49708
>>>>> The PR IMO fixes the 3 issues which I have described below.
>>>>> The race condition fix requires use of read write lock.
>>>>>
>>>>> 2) Jira is:
>>>>> https://issues.apache.org/jira/browse/SPARK-51016
>>>>>
>>>>> A checksum approach would result in greater number of retries, I
>>>>> believe:
>>>>> There is already mechanism in Expression to identify indeterminism
>>>>>
>>>>> Anyways, the issues which I think are , multiple, which the opened PR
>>>>> addresses.
>>>>> 1)  The stage.isIndeterminate is buggy as it is not able to identify
>>>>> inDeterminate, and the fix  spans some files like ShuffleDependency, RDD
>>>>> etc.
>>>>> 2) The race condition in DagScheduler:
>>>>>    a) Say a task1 corresponding to Partition P1 of an inDeterminate
>>>>> Stage Result fails with Fetch Failure
>>>>>    b) The code rightly checks if no previous partitions of result
>>>>> stage are successful yet, instead of throwing  Exception by aborting , it
>>>>> decides to retry.
>>>>>     c) During this window of retry, to check for dependencies and
>>>>> making a new attempt initiated by Task1, Task2 corresponding to Partition 
>>>>> 2
>>>>> of result stage becomes successful.
>>>>>    d) The new attempt, now sees instead of all missing partitions,
>>>>> sees n -1.  and  that creates  Data corruption. as it is no longer 
>>>>> retrying
>>>>> on all partitions.
>>>>>
>>>>> 3)  The DagSchedulerSuite's test
>>>>>      a)    test("SPARK-25341: retry all the succeeding stages when the
>>>>> map stage is indeterminate")
>>>>>           has following assertion:
>>>>>
>>>>> assert(failedStages.collect {
>>>>>       case stage: ShuffleMapStage if stage.shuffleDep.shuffleId ==
>>>>> shuffleId2 => stage
>>>>>     }.head.findMissingPartitions() == Seq(0))
>>>>>
>>>>> If my understanding is right and as seen from the outcome of the
>>>>> opened PR, as the ShuffleMapSage is turning out to be inDeterminate, all
>>>>> tasks should be retried:
>>>>> so the assertion should be , as expected value Seq(0, 1)
>>>>>
>>>>> b) If  the above #a is valid, then the second test
>>>>>       test("SPARK-32923: handle stage failure for indeterminate map
>>>>> stage with push-based shuffle") {
>>>>> which has assertion:
>>>>>
>>>>>          assert(failedStages.collect {
>>>>>       case stage: ShuffleMapStage if stage.shuffleDep.shuffleId ==
>>>>> shuffleId2 => stage
>>>>>     }.head.findMissingPartitions() == Seq(0))
>>>>>
>>>>> should also be instead,  Seq(0, 1)
>>>>>
>>>>>
>>>>> I am also attaching a patch which I was using a basis to debug and fix.
>>>>> The patch modifies the source code by using inline code interception
>>>>> and tweaks to reproduce the issue of data corruption using a unit test.
>>>>>
>>>>> And I am also attaching the BugTest which I used.
>>>>>
>>>>> The test is supposed to pass  if either it encounters exception from
>>>>> spark layer,  due to exception
>>>>> "Please eliminate the ndeterminacy by checkpointing the RDD before
>>>>> repartition and try again"
>>>>> or if the results of retry are as expected ( 12 rows along witrh data
>>>>> correctness check)
>>>>>
>>>>> Meanwhile I will try to productize the bug test.
>>>>>
>>>>> Would appreciate clarification on Test issue #3. And a review of PR,
>>>>> if done, would be appreciate much more.
>>>>>
>>>>> Sanotsh,
>>>>> I believe my PR would do a full partition retry ( if the Fetch failed
>>>>> happend prior to any other final task being successful, else it will abort
>>>>> query throwing Exception). No wrong result. I believe.
>>>>> Your review will be much appreciated.
>>>>> Regards
>>>>> Asif
>>>>>
>>>>>
>>>>> On Thu, Feb 13, 2025 at 4:54 PM Wenchen Fan <cloud0...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> > Have identified a race condition , in the current
>>>>>> DagSchdeduler code base
>>>>>>
>>>>>> Does it lead to the failure of full-stage retry? Can you also share
>>>>>> your PR so that we can take a closer look?
>>>>>>
>>>>>> BTW, I'm working with my colleagues on this runtime checksum idea. We
>>>>>> have something similar in our internal infra and I think we can create a
>>>>>> working solution shortly.
>>>>>>
>>>>>> On Thu, Feb 13, 2025 at 12:53 PM Santosh Pingale <
>>>>>> santosh.ping...@adyen.com> wrote:
>>>>>>
>>>>>>> > Maybe we should do it at runtime: if Spark retries a shuffle stage
>>>>>>> but the data becomes different (e.g. use checksum to check it), then 
>>>>>>> Spark
>>>>>>> should retry all the partitions of this stage.
>>>>>>>
>>>>>>> Having bitten hard recently by this behavior in spark and after
>>>>>>> having gone down the rabbit hole to investigate, we should definitely 
>>>>>>> try
>>>>>>> to refine and implement this approach. The non-deterministicness is
>>>>>>> dependent on several factors, including datatypes of the columns or use 
>>>>>>> of
>>>>>>> repartition; it is not easy for the users the reason about this 
>>>>>>> behavior.
>>>>>>>
>>>>>>> PS: The JIRA that I found and helped me confirm my understanding of
>>>>>>> the problem, plus educate internal users and platform team:
>>>>>>> https://issues.apache.org/jira/browse/SPARK-38388. Checksum
>>>>>>> approach was brought up in that JIRA too and I feel that is the balanced
>>>>>>> way to look at this problem.
>>>>>>>
>>>>>>> Kind regards
>>>>>>> Santosh
>>>>>>>
>>>>>>> On Thu, Feb 13, 2025, 12:41 AM Asif Shahid <asif.sha...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> Following up on the issue with some information:
>>>>>>>> 1) have opened a PR related to fix the isInDeterminate method in
>>>>>>>> Stage , RDD etc.
>>>>>>>> 2) After tweaking the source code , have been able to reproduce the
>>>>>>>> data loss issue reliably, but cannot productize the test, as it 
>>>>>>>> requires a
>>>>>>>> lot of inline interception and coaxing to induce test failure in a 
>>>>>>>> single
>>>>>>>> VM unit test. will post a diff of the source code patch and bug test.
>>>>>>>> 3) Have identified a race condition , in the current
>>>>>>>> DagSchdeduler code base:
>>>>>>>>     In the current DagScheduler code, function
>>>>>>>> *handleTaskCompletion, * when a task fails, while resubmitting the
>>>>>>>> failure as ResubmitFailedStage, prior to submitting, it checks if the
>>>>>>>> shuffle is inDeterminate.   If it is , then stage is submitted ONLY IF 
>>>>>>>> the
>>>>>>>> ResultStage has ZERO partitions processed. Which is right,  But it is 
>>>>>>>> not a
>>>>>>>> atomic operation till the point , where missingPartitions from stage is
>>>>>>>> invoked.  As a result it is possible that one of the task from the  
>>>>>>>> current
>>>>>>>> attempt, marks one of the partition as successful, and while the new
>>>>>>>> attempt is made, instead of getting missing partitions as  total
>>>>>>>> partitions,  it fetches only some of them.
>>>>>>>> Will be modifying the PR to fix the race.
>>>>>>>> Regards
>>>>>>>> Asif
>>>>>>>>
>>>>>>>> On Sun, Jan 26, 2025 at 11:19 PM Asif Shahid <asif.sha...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Shouldn't it be possible to determine with static data , if output
>>>>>>>>> will be deterministic ?. Expressions already have deterministic flag. 
>>>>>>>>> So
>>>>>>>>> when an attribute is created from alias, it will be possible to know 
>>>>>>>>> if
>>>>>>>>> attribute is pointing to an inDeterminate component.
>>>>>>>>>
>>>>>>>>> On Sun, Jan 26, 2025, 11:09 PM Wenchen Fan <cloud0...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> It looks like a hard problem to statically analyze the query plan
>>>>>>>>>> and decide whether a Spark stage is deterministic or not. When I 
>>>>>>>>>> added
>>>>>>>>>> RDD DeterministicLevel, I thought it was not a hard problem for the 
>>>>>>>>>> callers
>>>>>>>>>> to specify it, but seems I was wrong.
>>>>>>>>>>
>>>>>>>>>> Maybe we should do it at runtime: if Spark retries a shuffle
>>>>>>>>>> stage but the data becomes different (e.g. use checksum to check 
>>>>>>>>>> it), then
>>>>>>>>>> Spark should retry all the partitions of this stage. I'll look into 
>>>>>>>>>> this
>>>>>>>>>> repro after I'm back from the national holiday.
>>>>>>>>>>
>>>>>>>>>> On Mon, Jan 27, 2025 at 2:16 PM Asif Shahid <
>>>>>>>>>> asif.sha...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I am using below test.  as a unit test though , it will pass,
>>>>>>>>>>> as  to simulate executor lost in a single vm is difficult, but 
>>>>>>>>>>> there is
>>>>>>>>>>> definitely a bug.
>>>>>>>>>>> Using debugger if you check, the ShuffleStage.isDeterminate is
>>>>>>>>>>> turning out to be true, though it clearly should not be.
>>>>>>>>>>> As result if you look at DagScheduler, TaskSchedulerImpl and
>>>>>>>>>>> TaskSet code, it will not retry the ShuffleStage fully, instead 
>>>>>>>>>>> would only
>>>>>>>>>>> retry missing task.
>>>>>>>>>>> Which means on retry if the join used a random value which puts
>>>>>>>>>>> it in already completed shuffle task, the results will be missed.
>>>>>>>>>>>
>>>>>>>>>>>> package org.apache.spark.sql.vectorized
>>>>>>>>>>>>
>>>>>>>>>>>> import org.apache.spark.rdd.ZippedPartitionsRDD2
>>>>>>>>>>>> import org.apache.spark.sql.{DataFrame, Encoders, QueryTest}
>>>>>>>>>>>> import org.apache.spark.sql.catalyst.expressions.Literal
>>>>>>>>>>>> import org.apache.spark.sql.execution.datasources.FileScanRDD
>>>>>>>>>>>> import org.apache.spark.sql.functions._
>>>>>>>>>>>> import org.apache.spark.sql.internal.SQLConf
>>>>>>>>>>>> import org.apache.spark.sql.test.SharedSparkSession
>>>>>>>>>>>> import org.apache.spark.sql.types.LongType
>>>>>>>>>>>>
>>>>>>>>>>>> class BugTest extends QueryTest with SharedSparkSession {
>>>>>>>>>>>>   import testImplicits._
>>>>>>>>>>>>  /* test("no retries") {
>>>>>>>>>>>>     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") 
>>>>>>>>>>>> {
>>>>>>>>>>>>       val baseDf = spark.createDataset(
>>>>>>>>>>>>         Seq((1L, "a"), (2L, "b"), (3L, "c"), (null, "w"), (null, 
>>>>>>>>>>>> "x"), (null, "y"), (null, "z")))(
>>>>>>>>>>>>         Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft")
>>>>>>>>>>>>
>>>>>>>>>>>>       val leftOuter = baseDf.select(
>>>>>>>>>>>>         $"strleft", when(isnull($"pkLeft"), 
>>>>>>>>>>>> monotonically_increasing_id() + Literal(100)).
>>>>>>>>>>>>           otherwise($"pkLeft").as("pkLeft"))
>>>>>>>>>>>>       leftOuter.show(10000)
>>>>>>>>>>>>
>>>>>>>>>>>>       val innerRight = spark.createDataset(
>>>>>>>>>>>>         Seq((1L, "11"), (2L, "22"), (3L, "33")))(
>>>>>>>>>>>>         Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")
>>>>>>>>>>>>
>>>>>>>>>>>>       val innerjoin = leftOuter.join(innerRight, $"pkLeft" === 
>>>>>>>>>>>> $"pkRight", "inner")
>>>>>>>>>>>>
>>>>>>>>>>>>       innerjoin.show(1000)
>>>>>>>>>>>>
>>>>>>>>>>>>       val outerjoin = leftOuter.join(innerRight, $"pkLeft" === 
>>>>>>>>>>>> $"pkRight", "left_outer")
>>>>>>>>>>>>
>>>>>>>>>>>>       outerjoin.show(1000)
>>>>>>>>>>>>     }
>>>>>>>>>>>>   } */
>>>>>>>>>>>>
>>>>>>>>>>>>   test("with retries") {
>>>>>>>>>>>>     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") 
>>>>>>>>>>>> {
>>>>>>>>>>>>       withTable("outer", "inner") {
>>>>>>>>>>>>         createBaseTables()
>>>>>>>>>>>>         val outerjoin: DataFrame = getOuterJoinDF
>>>>>>>>>>>>
>>>>>>>>>>>>         println("Initial data")
>>>>>>>>>>>>         outerjoin.show(1000)
>>>>>>>>>>>>         val correctRows = outerjoin.collect()
>>>>>>>>>>>>         for( i <- 0 until 100) {
>>>>>>>>>>>>          FileScanRDD.throwException = false
>>>>>>>>>>>>           ZippedPartitionsRDD2.throwException = true
>>>>>>>>>>>>           val rowsAfterRetry = getOuterJoinDF.collect()
>>>>>>>>>>>>           import scala.jdk.CollectionConverters._
>>>>>>>>>>>>           val temp = 
>>>>>>>>>>>> spark.createDataFrame(rowsAfterRetry.toSeq.asJava, 
>>>>>>>>>>>> outerjoin.schema)
>>>>>>>>>>>>           println("after retry data")
>>>>>>>>>>>>           temp.show(1000)
>>>>>>>>>>>>           assert(correctRows.length == rowsAfterRetry.length)
>>>>>>>>>>>>           val retriedResults = rowsAfterRetry.toBuffer
>>>>>>>>>>>>           correctRows.foreach(r => {
>>>>>>>>>>>>             val index = retriedResults.indexWhere(x =>
>>>>>>>>>>>>               r.getString(0) == x.getString(0) &&
>>>>>>>>>>>>                 (r.getLong(1) == x.getLong(1) || (x.isNullAt(2) && 
>>>>>>>>>>>> r.isNullAt(2) &&
>>>>>>>>>>>>                   x.isNullAt(3) && r.isNullAt(3))) &&
>>>>>>>>>>>>                 ((r.isNullAt(2) && x.isNullAt(2)) || r.getLong(2) 
>>>>>>>>>>>> == x.getLong(2)) &&
>>>>>>>>>>>>                 ((r.isNullAt(3) && x.isNullAt(3)) || 
>>>>>>>>>>>> r.getString(3) == x.getString(3)))
>>>>>>>>>>>>             assert(index >= 0)
>>>>>>>>>>>>             retriedResults.remove(index)
>>>>>>>>>>>>           }
>>>>>>>>>>>>           )
>>>>>>>>>>>>           assert(retriedResults.isEmpty)
>>>>>>>>>>>>         }
>>>>>>>>>>>>
>>>>>>>>>>>>      //   Thread.sleep(10000000)
>>>>>>>>>>>>       }
>>>>>>>>>>>>     }
>>>>>>>>>>>>   }
>>>>>>>>>>>>
>>>>>>>>>>>>   private def createBaseTables(): Unit = {
>>>>>>>>>>>>     /*val outerDf = spark.createDataset(
>>>>>>>>>>>>       Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, 
>>>>>>>>>>>> "cc"), (null, "cc")))(
>>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft")
>>>>>>>>>>>>     outerDf.write.format("parquet").saveAsTable("outer")*/
>>>>>>>>>>>>
>>>>>>>>>>>>     val outerDf = spark.createDataset(
>>>>>>>>>>>>       Seq((1L, "aa"), (null, "aa"), (2L, "aa"), (null, "bb"), (3L, 
>>>>>>>>>>>> "bb"), (null, "bb")))(
>>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>>> Encoders.STRING)).toDF("pkLeftt", "strleft")
>>>>>>>>>>>>     
>>>>>>>>>>>> outerDf.write.format("parquet").partitionBy("strleft").saveAsTable("outer")
>>>>>>>>>>>>
>>>>>>>>>>>>     /*val innerDf = spark.createDataset(
>>>>>>>>>>>>       Seq((1L, "11"), (2L, "22"), (3L, "33")))(
>>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")*/
>>>>>>>>>>>>
>>>>>>>>>>>>     val innerDf = spark.createDataset(
>>>>>>>>>>>>       Seq((1L, "11"), (2L, "11"), (3L, "33")))(
>>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")
>>>>>>>>>>>>
>>>>>>>>>>>>     
>>>>>>>>>>>> innerDf.write.format("parquet").partitionBy("strright").saveAsTable("inner")
>>>>>>>>>>>>
>>>>>>>>>>>>     val innerInnerDf = spark.createDataset(
>>>>>>>>>>>>       Seq((1L, "111"), (2L, "222"), (3L, "333")))(
>>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>>> Encoders.STRING)).toDF("pkpkRight", "strstrright")
>>>>>>>>>>>>
>>>>>>>>>>>>   //  
>>>>>>>>>>>> innerInnerDf.write.format("parquet").saveAsTable("innerinner")
>>>>>>>>>>>>   }
>>>>>>>>>>>>
>>>>>>>>>>>>   private def getOuterJoinDF = {
>>>>>>>>>>>>     val leftOuter = spark.table("outer").select(
>>>>>>>>>>>>       $"strleft", when(isnull($"pkLeftt"), floor(rand() * 
>>>>>>>>>>>> Literal(10000000L)).
>>>>>>>>>>>>         cast(LongType)).
>>>>>>>>>>>>         otherwise($"pkLeftt").as("pkLeft"))
>>>>>>>>>>>>
>>>>>>>>>>>>    val innerRight = spark.table("inner")
>>>>>>>>>>>>  //   val innerinnerRight = spark.table("innerinner")
>>>>>>>>>>>>
>>>>>>>>>>>>     val outerjoin = leftOuter.hint("shuffle_hash").
>>>>>>>>>>>>       join(innerRight, $"pkLeft" === $"pkRight", "left_outer")
>>>>>>>>>>>>     outerjoin
>>>>>>>>>>>>     /*
>>>>>>>>>>>>     val outerOuterJoin = outerjoin.hint("shuffle_hash").
>>>>>>>>>>>>       join(innerinnerRight, $"pkLeft" === $"pkpkRight", 
>>>>>>>>>>>> "left_outer")
>>>>>>>>>>>>     outerOuterJoin
>>>>>>>>>>>>
>>>>>>>>>>>>      */
>>>>>>>>>>>>   }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jan 26, 2025 at 10:05 PM Asif Shahid <
>>>>>>>>>>> asif.sha...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Sure. I will send prototypical query tomorrow. Though its
>>>>>>>>>>>> difficult to simulate issue using unit test , but I think the 
>>>>>>>>>>>> issue is
>>>>>>>>>>>> Rdd.isIndeterminate is not returning true for the query. As a
>>>>>>>>>>>> result, on retry, the shuffle stage is not reattempted fully.
>>>>>>>>>>>> And rdd is not returning inDeterminate as true , is due to
>>>>>>>>>>>> ShuffleRdd not taking into account , as the ShuffleDependency is 
>>>>>>>>>>>> not taking
>>>>>>>>>>>> into account inDeterminate nature of HashPartitioner.
>>>>>>>>>>>> Moreover the attribute ref provided to HashPartitioner though
>>>>>>>>>>>> is pointing to an inDeterminate alias, is having deterministic 
>>>>>>>>>>>> flag as true
>>>>>>>>>>>> ( which in itself is logically correct).
>>>>>>>>>>>> So I feel apart from ShuffleDependency code change, all
>>>>>>>>>>>> expressions should have a lazy Boolean say containsIndeterministic
>>>>>>>>>>>> component. Which will be true if the expression is indeterministic 
>>>>>>>>>>>> or
>>>>>>>>>>>> contains any attribute ref which has is indeterministicComponent 
>>>>>>>>>>>> true.
>>>>>>>>>>>>
>>>>>>>>>>>> And on personal note.. thanks for your interest..  this is very
>>>>>>>>>>>> rare attitude.
>>>>>>>>>>>> Regards
>>>>>>>>>>>> Asif
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Jan 26, 2025, 9:45 PM Ángel <
>>>>>>>>>>>> angel.alvarez.pas...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Asif,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Could you provide an example (code+dataset) to analize this?
>>>>>>>>>>>>> Looks interesting ...
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Ángel
>>>>>>>>>>>>>
>>>>>>>>>>>>> El dom, 26 ene 2025 a las 20:58, Asif Shahid (<
>>>>>>>>>>>>> asif.sha...@gmail.com>) escribió:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> On further thoughts, I concur that leaf expressions like
>>>>>>>>>>>>>> AttributeRefs can always be considered to be  deterministic, as 
>>>>>>>>>>>>>> , as a java
>>>>>>>>>>>>>> variable the value contained in it per iteration is invariant ( 
>>>>>>>>>>>>>> except when
>>>>>>>>>>>>>> changed by some deterministic logic). So in that sense what I 
>>>>>>>>>>>>>> said in the
>>>>>>>>>>>>>> above mail as that an issue is incorrect.
>>>>>>>>>>>>>> But I think that AttributeRef should have a boolean method
>>>>>>>>>>>>>> which tells, whether the value it represents is from an 
>>>>>>>>>>>>>> indeterminate
>>>>>>>>>>>>>> source or not.
>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>> Asif
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Jan 24, 2025 at 5:18 PM Asif Shahid <
>>>>>>>>>>>>>> asif.sha...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> While testing a use case where the query had an outer join
>>>>>>>>>>>>>>> such that joining key of left outer table either had a valid 
>>>>>>>>>>>>>>> value or a
>>>>>>>>>>>>>>> random value( salting to avoid skew).
>>>>>>>>>>>>>>> The case was reported to have incorrect results in case of
>>>>>>>>>>>>>>> node failure, with retry.
>>>>>>>>>>>>>>> On debugging the code, have found following, which has left
>>>>>>>>>>>>>>> me confused as to what is spark's strategy for indeterministic 
>>>>>>>>>>>>>>> fields.
>>>>>>>>>>>>>>> Some serious issues are :
>>>>>>>>>>>>>>> 1) All the leaf expressions, like AttributeReference  are
>>>>>>>>>>>>>>> always considered deterministic. Which means if an attribute is 
>>>>>>>>>>>>>>> pointing to
>>>>>>>>>>>>>>> an Alias which itself is indeterministic,  the attribute will 
>>>>>>>>>>>>>>> still be
>>>>>>>>>>>>>>> considered deterministic
>>>>>>>>>>>>>>> 2) In CheckAnalysis there is code which checks whether each
>>>>>>>>>>>>>>> Operator either supports indeterministic value or not . Join is 
>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>> included in the list of supported, but it passes even if the 
>>>>>>>>>>>>>>> joining key is
>>>>>>>>>>>>>>> pointing to an indeterministic alias. ( When I tried fixing it, 
>>>>>>>>>>>>>>> found a
>>>>>>>>>>>>>>> plethora of operators failing Like DeserializedObject, 
>>>>>>>>>>>>>>> LocalRelation etc
>>>>>>>>>>>>>>> which are not supposed to contain indeterministic attributes ( 
>>>>>>>>>>>>>>> because they
>>>>>>>>>>>>>>> are not in the list of supporting operators).
>>>>>>>>>>>>>>> 3) The ShuffleDependency does not check for indeterministic
>>>>>>>>>>>>>>> nature of partitioner ( fixed it locally and then realized that 
>>>>>>>>>>>>>>> there is
>>>>>>>>>>>>>>> the bug #1 which needs to be fixed too).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The code in DagScheduler / TaskSet, TaskScheduler etc, seems
>>>>>>>>>>>>>>> to have been written , keeping in mind the indeterministic 
>>>>>>>>>>>>>>> nature of the
>>>>>>>>>>>>>>> previous and current stages , so as to rexecute previous stages 
>>>>>>>>>>>>>>> as a whole,
>>>>>>>>>>>>>>> instead of just missing tasks, but the above  3 points, do not 
>>>>>>>>>>>>>>> seem to
>>>>>>>>>>>>>>> support the code of DagScheduler / TaskScheduler.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>> Asif
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>

Reply via email to