Checksum check as a fallback could indeed make sense. Good to have a catch
all way here.

> BTW, I'm working with my colleagues on this runtime checksum idea. We
have something similar in our internal infra and I think we can create a
working solution shortly

@Wenchen Fan <cloud0...@gmail.com> is there any SPIP/JIRA ticket/PR for
work you are doing with your colleague?

On Sun, Feb 16, 2025, 6:43 PM Asif Shahid <asif.sha...@gmail.com> wrote:

> Hi.
> Ok . did the final checkin. Pls feel free to review.
> Regards
> Asif
>
>
> On Sat, Feb 15, 2025 at 6:42 PM Asif Shahid <asif.sha...@gmail.com> wrote:
>
>> Pls hold on reviewing the patch, as I need to do one more checkin.
>> I have still left a window of race , by releasing the read lock early,
>> for the  case of first task being successful,  and the failing task has yet
>> to acquire the write lock.
>> Regards
>> Asif
>>
>> On Thu, Feb 13, 2025 at 10:48 PM Asif Shahid <asif.sha...@gmail.com>
>> wrote:
>>
>>> Hi Wenchen,
>>> Apologies... I thought it was Santosh asking for the PR. and race
>>> condition...
>>> Yes the race condition causes only some partitions to be retried ..
>>> May be a checksum can be used as fallback mechanism?
>>> I suppose checksum would be implemented on executor sides..? Then there
>>> might be condition where shuffle files could be lost before
>>> driver/executors are communicated checksum ?
>>> Regards
>>> Asif
>>>
>>>
>>> On Thu, Feb 13, 2025 at 7:39 PM Asif Shahid <asif.sha...@gmail.com>
>>> wrote:
>>>
>>>> The bugrepro patch , when applied on current master, will show failure
>>>> with incorrect results.
>>>> While on the PR branch , it will pass.
>>>> The number of iterations in the test is 100.
>>>>
>>>> Regards
>>>> Asif
>>>>
>>>> On Thu, Feb 13, 2025 at 7:35 PM Asif Shahid <asif.sha...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> Following up on this issue.
>>>>> The PR opened is:
>>>>> 1)https://github.com/apache/spark/pull/49708
>>>>> The PR IMO fixes the 3 issues which I have described below.
>>>>> The race condition fix requires use of read write lock.
>>>>>
>>>>> 2) Jira is:
>>>>> https://issues.apache.org/jira/browse/SPARK-51016
>>>>>
>>>>> A checksum approach would result in greater number of retries, I
>>>>> believe:
>>>>> There is already mechanism in Expression to identify indeterminism
>>>>>
>>>>> Anyways, the issues which I think are , multiple, which the opened PR
>>>>> addresses.
>>>>> 1)  The stage.isIndeterminate is buggy as it is not able to identify
>>>>> inDeterminate, and the fix  spans some files like ShuffleDependency, RDD
>>>>> etc.
>>>>> 2) The race condition in DagScheduler:
>>>>>    a) Say a task1 corresponding to Partition P1 of an inDeterminate
>>>>> Stage Result fails with Fetch Failure
>>>>>    b) The code rightly checks if no previous partitions of result
>>>>> stage are successful yet, instead of throwing  Exception by aborting , it
>>>>> decides to retry.
>>>>>     c) During this window of retry, to check for dependencies and
>>>>> making a new attempt initiated by Task1, Task2 corresponding to Partition 
>>>>> 2
>>>>> of result stage becomes successful.
>>>>>    d) The new attempt, now sees instead of all missing partitions,
>>>>> sees n -1.  and  that creates  Data corruption. as it is no longer 
>>>>> retrying
>>>>> on all partitions.
>>>>>
>>>>> 3)  The DagSchedulerSuite's test
>>>>>      a)    test("SPARK-25341: retry all the succeeding stages when the
>>>>> map stage is indeterminate")
>>>>>           has following assertion:
>>>>>
>>>>> assert(failedStages.collect {
>>>>>       case stage: ShuffleMapStage if stage.shuffleDep.shuffleId ==
>>>>> shuffleId2 => stage
>>>>>     }.head.findMissingPartitions() == Seq(0))
>>>>>
>>>>> If my understanding is right and as seen from the outcome of the
>>>>> opened PR, as the ShuffleMapSage is turning out to be inDeterminate, all
>>>>> tasks should be retried:
>>>>> so the assertion should be , as expected value Seq(0, 1)
>>>>>
>>>>> b) If  the above #a is valid, then the second test
>>>>>       test("SPARK-32923: handle stage failure for indeterminate map
>>>>> stage with push-based shuffle") {
>>>>> which has assertion:
>>>>>
>>>>>          assert(failedStages.collect {
>>>>>       case stage: ShuffleMapStage if stage.shuffleDep.shuffleId ==
>>>>> shuffleId2 => stage
>>>>>     }.head.findMissingPartitions() == Seq(0))
>>>>>
>>>>> should also be instead,  Seq(0, 1)
>>>>>
>>>>>
>>>>> I am also attaching a patch which I was using a basis to debug and fix.
>>>>> The patch modifies the source code by using inline code interception
>>>>> and tweaks to reproduce the issue of data corruption using a unit test.
>>>>>
>>>>> And I am also attaching the BugTest which I used.
>>>>>
>>>>> The test is supposed to pass  if either it encounters exception from
>>>>> spark layer,  due to exception
>>>>> "Please eliminate the ndeterminacy by checkpointing the RDD before
>>>>> repartition and try again"
>>>>> or if the results of retry are as expected ( 12 rows along witrh data
>>>>> correctness check)
>>>>>
>>>>> Meanwhile I will try to productize the bug test.
>>>>>
>>>>> Would appreciate clarification on Test issue #3. And a review of PR,
>>>>> if done, would be appreciate much more.
>>>>>
>>>>> Sanotsh,
>>>>> I believe my PR would do a full partition retry ( if the Fetch failed
>>>>> happend prior to any other final task being successful, else it will abort
>>>>> query throwing Exception). No wrong result. I believe.
>>>>> Your review will be much appreciated.
>>>>> Regards
>>>>> Asif
>>>>>
>>>>>
>>>>> On Thu, Feb 13, 2025 at 4:54 PM Wenchen Fan <cloud0...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> > Have identified a race condition , in the current
>>>>>> DagSchdeduler code base
>>>>>>
>>>>>> Does it lead to the failure of full-stage retry? Can you also share
>>>>>> your PR so that we can take a closer look?
>>>>>>
>>>>>> BTW, I'm working with my colleagues on this runtime checksum idea. We
>>>>>> have something similar in our internal infra and I think we can create a
>>>>>> working solution shortly.
>>>>>>
>>>>>> On Thu, Feb 13, 2025 at 12:53 PM Santosh Pingale <
>>>>>> santosh.ping...@adyen.com> wrote:
>>>>>>
>>>>>>> > Maybe we should do it at runtime: if Spark retries a shuffle stage
>>>>>>> but the data becomes different (e.g. use checksum to check it), then 
>>>>>>> Spark
>>>>>>> should retry all the partitions of this stage.
>>>>>>>
>>>>>>> Having bitten hard recently by this behavior in spark and after
>>>>>>> having gone down the rabbit hole to investigate, we should definitely 
>>>>>>> try
>>>>>>> to refine and implement this approach. The non-deterministicness is
>>>>>>> dependent on several factors, including datatypes of the columns or use 
>>>>>>> of
>>>>>>> repartition; it is not easy for the users the reason about this 
>>>>>>> behavior.
>>>>>>>
>>>>>>> PS: The JIRA that I found and helped me confirm my understanding of
>>>>>>> the problem, plus educate internal users and platform team:
>>>>>>> https://issues.apache.org/jira/browse/SPARK-38388. Checksum
>>>>>>> approach was brought up in that JIRA too and I feel that is the balanced
>>>>>>> way to look at this problem.
>>>>>>>
>>>>>>> Kind regards
>>>>>>> Santosh
>>>>>>>
>>>>>>> On Thu, Feb 13, 2025, 12:41 AM Asif Shahid <asif.sha...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> Following up on the issue with some information:
>>>>>>>> 1) have opened a PR related to fix the isInDeterminate method in
>>>>>>>> Stage , RDD etc.
>>>>>>>> 2) After tweaking the source code , have been able to reproduce the
>>>>>>>> data loss issue reliably, but cannot productize the test, as it 
>>>>>>>> requires a
>>>>>>>> lot of inline interception and coaxing to induce test failure in a 
>>>>>>>> single
>>>>>>>> VM unit test. will post a diff of the source code patch and bug test.
>>>>>>>> 3) Have identified a race condition , in the current
>>>>>>>> DagSchdeduler code base:
>>>>>>>>     In the current DagScheduler code, function
>>>>>>>> *handleTaskCompletion, * when a task fails, while resubmitting the
>>>>>>>> failure as ResubmitFailedStage, prior to submitting, it checks if the
>>>>>>>> shuffle is inDeterminate.   If it is , then stage is submitted ONLY IF 
>>>>>>>> the
>>>>>>>> ResultStage has ZERO partitions processed. Which is right,  But it is 
>>>>>>>> not a
>>>>>>>> atomic operation till the point , where missingPartitions from stage is
>>>>>>>> invoked.  As a result it is possible that one of the task from the  
>>>>>>>> current
>>>>>>>> attempt, marks one of the partition as successful, and while the new
>>>>>>>> attempt is made, instead of getting missing partitions as  total
>>>>>>>> partitions,  it fetches only some of them.
>>>>>>>> Will be modifying the PR to fix the race.
>>>>>>>> Regards
>>>>>>>> Asif
>>>>>>>>
>>>>>>>> On Sun, Jan 26, 2025 at 11:19 PM Asif Shahid <asif.sha...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Shouldn't it be possible to determine with static data , if output
>>>>>>>>> will be deterministic ?. Expressions already have deterministic flag. 
>>>>>>>>> So
>>>>>>>>> when an attribute is created from alias, it will be possible to know 
>>>>>>>>> if
>>>>>>>>> attribute is pointing to an inDeterminate component.
>>>>>>>>>
>>>>>>>>> On Sun, Jan 26, 2025, 11:09 PM Wenchen Fan <cloud0...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> It looks like a hard problem to statically analyze the query plan
>>>>>>>>>> and decide whether a Spark stage is deterministic or not. When I 
>>>>>>>>>> added
>>>>>>>>>> RDD DeterministicLevel, I thought it was not a hard problem for the 
>>>>>>>>>> callers
>>>>>>>>>> to specify it, but seems I was wrong.
>>>>>>>>>>
>>>>>>>>>> Maybe we should do it at runtime: if Spark retries a shuffle
>>>>>>>>>> stage but the data becomes different (e.g. use checksum to check 
>>>>>>>>>> it), then
>>>>>>>>>> Spark should retry all the partitions of this stage. I'll look into 
>>>>>>>>>> this
>>>>>>>>>> repro after I'm back from the national holiday.
>>>>>>>>>>
>>>>>>>>>> On Mon, Jan 27, 2025 at 2:16 PM Asif Shahid <
>>>>>>>>>> asif.sha...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I am using below test.  as a unit test though , it will pass,
>>>>>>>>>>> as  to simulate executor lost in a single vm is difficult, but 
>>>>>>>>>>> there is
>>>>>>>>>>> definitely a bug.
>>>>>>>>>>> Using debugger if you check, the ShuffleStage.isDeterminate is
>>>>>>>>>>> turning out to be true, though it clearly should not be.
>>>>>>>>>>> As result if you look at DagScheduler, TaskSchedulerImpl and
>>>>>>>>>>> TaskSet code, it will not retry the ShuffleStage fully, instead 
>>>>>>>>>>> would only
>>>>>>>>>>> retry missing task.
>>>>>>>>>>> Which means on retry if the join used a random value which puts
>>>>>>>>>>> it in already completed shuffle task, the results will be missed.
>>>>>>>>>>>
>>>>>>>>>>>> package org.apache.spark.sql.vectorized
>>>>>>>>>>>>
>>>>>>>>>>>> import org.apache.spark.rdd.ZippedPartitionsRDD2
>>>>>>>>>>>> import org.apache.spark.sql.{DataFrame, Encoders, QueryTest}
>>>>>>>>>>>> import org.apache.spark.sql.catalyst.expressions.Literal
>>>>>>>>>>>> import org.apache.spark.sql.execution.datasources.FileScanRDD
>>>>>>>>>>>> import org.apache.spark.sql.functions._
>>>>>>>>>>>> import org.apache.spark.sql.internal.SQLConf
>>>>>>>>>>>> import org.apache.spark.sql.test.SharedSparkSession
>>>>>>>>>>>> import org.apache.spark.sql.types.LongType
>>>>>>>>>>>>
>>>>>>>>>>>> class BugTest extends QueryTest with SharedSparkSession {
>>>>>>>>>>>>   import testImplicits._
>>>>>>>>>>>>  /* test("no retries") {
>>>>>>>>>>>>     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") 
>>>>>>>>>>>> {
>>>>>>>>>>>>       val baseDf = spark.createDataset(
>>>>>>>>>>>>         Seq((1L, "a"), (2L, "b"), (3L, "c"), (null, "w"), (null, 
>>>>>>>>>>>> "x"), (null, "y"), (null, "z")))(
>>>>>>>>>>>>         Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft")
>>>>>>>>>>>>
>>>>>>>>>>>>       val leftOuter = baseDf.select(
>>>>>>>>>>>>         $"strleft", when(isnull($"pkLeft"), 
>>>>>>>>>>>> monotonically_increasing_id() + Literal(100)).
>>>>>>>>>>>>           otherwise($"pkLeft").as("pkLeft"))
>>>>>>>>>>>>       leftOuter.show(10000)
>>>>>>>>>>>>
>>>>>>>>>>>>       val innerRight = spark.createDataset(
>>>>>>>>>>>>         Seq((1L, "11"), (2L, "22"), (3L, "33")))(
>>>>>>>>>>>>         Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")
>>>>>>>>>>>>
>>>>>>>>>>>>       val innerjoin = leftOuter.join(innerRight, $"pkLeft" === 
>>>>>>>>>>>> $"pkRight", "inner")
>>>>>>>>>>>>
>>>>>>>>>>>>       innerjoin.show(1000)
>>>>>>>>>>>>
>>>>>>>>>>>>       val outerjoin = leftOuter.join(innerRight, $"pkLeft" === 
>>>>>>>>>>>> $"pkRight", "left_outer")
>>>>>>>>>>>>
>>>>>>>>>>>>       outerjoin.show(1000)
>>>>>>>>>>>>     }
>>>>>>>>>>>>   } */
>>>>>>>>>>>>
>>>>>>>>>>>>   test("with retries") {
>>>>>>>>>>>>     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") 
>>>>>>>>>>>> {
>>>>>>>>>>>>       withTable("outer", "inner") {
>>>>>>>>>>>>         createBaseTables()
>>>>>>>>>>>>         val outerjoin: DataFrame = getOuterJoinDF
>>>>>>>>>>>>
>>>>>>>>>>>>         println("Initial data")
>>>>>>>>>>>>         outerjoin.show(1000)
>>>>>>>>>>>>         val correctRows = outerjoin.collect()
>>>>>>>>>>>>         for( i <- 0 until 100) {
>>>>>>>>>>>>          FileScanRDD.throwException = false
>>>>>>>>>>>>           ZippedPartitionsRDD2.throwException = true
>>>>>>>>>>>>           val rowsAfterRetry = getOuterJoinDF.collect()
>>>>>>>>>>>>           import scala.jdk.CollectionConverters._
>>>>>>>>>>>>           val temp = 
>>>>>>>>>>>> spark.createDataFrame(rowsAfterRetry.toSeq.asJava, 
>>>>>>>>>>>> outerjoin.schema)
>>>>>>>>>>>>           println("after retry data")
>>>>>>>>>>>>           temp.show(1000)
>>>>>>>>>>>>           assert(correctRows.length == rowsAfterRetry.length)
>>>>>>>>>>>>           val retriedResults = rowsAfterRetry.toBuffer
>>>>>>>>>>>>           correctRows.foreach(r => {
>>>>>>>>>>>>             val index = retriedResults.indexWhere(x =>
>>>>>>>>>>>>               r.getString(0) == x.getString(0) &&
>>>>>>>>>>>>                 (r.getLong(1) == x.getLong(1) || (x.isNullAt(2) && 
>>>>>>>>>>>> r.isNullAt(2) &&
>>>>>>>>>>>>                   x.isNullAt(3) && r.isNullAt(3))) &&
>>>>>>>>>>>>                 ((r.isNullAt(2) && x.isNullAt(2)) || r.getLong(2) 
>>>>>>>>>>>> == x.getLong(2)) &&
>>>>>>>>>>>>                 ((r.isNullAt(3) && x.isNullAt(3)) || 
>>>>>>>>>>>> r.getString(3) == x.getString(3)))
>>>>>>>>>>>>             assert(index >= 0)
>>>>>>>>>>>>             retriedResults.remove(index)
>>>>>>>>>>>>           }
>>>>>>>>>>>>           )
>>>>>>>>>>>>           assert(retriedResults.isEmpty)
>>>>>>>>>>>>         }
>>>>>>>>>>>>
>>>>>>>>>>>>      //   Thread.sleep(10000000)
>>>>>>>>>>>>       }
>>>>>>>>>>>>     }
>>>>>>>>>>>>   }
>>>>>>>>>>>>
>>>>>>>>>>>>   private def createBaseTables(): Unit = {
>>>>>>>>>>>>     /*val outerDf = spark.createDataset(
>>>>>>>>>>>>       Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, 
>>>>>>>>>>>> "cc"), (null, "cc")))(
>>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft")
>>>>>>>>>>>>     outerDf.write.format("parquet").saveAsTable("outer")*/
>>>>>>>>>>>>
>>>>>>>>>>>>     val outerDf = spark.createDataset(
>>>>>>>>>>>>       Seq((1L, "aa"), (null, "aa"), (2L, "aa"), (null, "bb"), (3L, 
>>>>>>>>>>>> "bb"), (null, "bb")))(
>>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>>> Encoders.STRING)).toDF("pkLeftt", "strleft")
>>>>>>>>>>>>     
>>>>>>>>>>>> outerDf.write.format("parquet").partitionBy("strleft").saveAsTable("outer")
>>>>>>>>>>>>
>>>>>>>>>>>>     /*val innerDf = spark.createDataset(
>>>>>>>>>>>>       Seq((1L, "11"), (2L, "22"), (3L, "33")))(
>>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")*/
>>>>>>>>>>>>
>>>>>>>>>>>>     val innerDf = spark.createDataset(
>>>>>>>>>>>>       Seq((1L, "11"), (2L, "11"), (3L, "33")))(
>>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")
>>>>>>>>>>>>
>>>>>>>>>>>>     
>>>>>>>>>>>> innerDf.write.format("parquet").partitionBy("strright").saveAsTable("inner")
>>>>>>>>>>>>
>>>>>>>>>>>>     val innerInnerDf = spark.createDataset(
>>>>>>>>>>>>       Seq((1L, "111"), (2L, "222"), (3L, "333")))(
>>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>>> Encoders.STRING)).toDF("pkpkRight", "strstrright")
>>>>>>>>>>>>
>>>>>>>>>>>>   //  
>>>>>>>>>>>> innerInnerDf.write.format("parquet").saveAsTable("innerinner")
>>>>>>>>>>>>   }
>>>>>>>>>>>>
>>>>>>>>>>>>   private def getOuterJoinDF = {
>>>>>>>>>>>>     val leftOuter = spark.table("outer").select(
>>>>>>>>>>>>       $"strleft", when(isnull($"pkLeftt"), floor(rand() * 
>>>>>>>>>>>> Literal(10000000L)).
>>>>>>>>>>>>         cast(LongType)).
>>>>>>>>>>>>         otherwise($"pkLeftt").as("pkLeft"))
>>>>>>>>>>>>
>>>>>>>>>>>>    val innerRight = spark.table("inner")
>>>>>>>>>>>>  //   val innerinnerRight = spark.table("innerinner")
>>>>>>>>>>>>
>>>>>>>>>>>>     val outerjoin = leftOuter.hint("shuffle_hash").
>>>>>>>>>>>>       join(innerRight, $"pkLeft" === $"pkRight", "left_outer")
>>>>>>>>>>>>     outerjoin
>>>>>>>>>>>>     /*
>>>>>>>>>>>>     val outerOuterJoin = outerjoin.hint("shuffle_hash").
>>>>>>>>>>>>       join(innerinnerRight, $"pkLeft" === $"pkpkRight", 
>>>>>>>>>>>> "left_outer")
>>>>>>>>>>>>     outerOuterJoin
>>>>>>>>>>>>
>>>>>>>>>>>>      */
>>>>>>>>>>>>   }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jan 26, 2025 at 10:05 PM Asif Shahid <
>>>>>>>>>>> asif.sha...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Sure. I will send prototypical query tomorrow. Though its
>>>>>>>>>>>> difficult to simulate issue using unit test , but I think the 
>>>>>>>>>>>> issue is
>>>>>>>>>>>> Rdd.isIndeterminate is not returning true for the query. As a
>>>>>>>>>>>> result, on retry, the shuffle stage is not reattempted fully.
>>>>>>>>>>>> And rdd is not returning inDeterminate as true , is due to
>>>>>>>>>>>> ShuffleRdd not taking into account , as the ShuffleDependency is 
>>>>>>>>>>>> not taking
>>>>>>>>>>>> into account inDeterminate nature of HashPartitioner.
>>>>>>>>>>>> Moreover the attribute ref provided to HashPartitioner though
>>>>>>>>>>>> is pointing to an inDeterminate alias, is having deterministic 
>>>>>>>>>>>> flag as true
>>>>>>>>>>>> ( which in itself is logically correct).
>>>>>>>>>>>> So I feel apart from ShuffleDependency code change, all
>>>>>>>>>>>> expressions should have a lazy Boolean say containsIndeterministic
>>>>>>>>>>>> component. Which will be true if the expression is indeterministic 
>>>>>>>>>>>> or
>>>>>>>>>>>> contains any attribute ref which has is indeterministicComponent 
>>>>>>>>>>>> true.
>>>>>>>>>>>>
>>>>>>>>>>>> And on personal note.. thanks for your interest..  this is very
>>>>>>>>>>>> rare attitude.
>>>>>>>>>>>> Regards
>>>>>>>>>>>> Asif
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Jan 26, 2025, 9:45 PM Ángel <
>>>>>>>>>>>> angel.alvarez.pas...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Asif,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Could you provide an example (code+dataset) to analize this?
>>>>>>>>>>>>> Looks interesting ...
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Ángel
>>>>>>>>>>>>>
>>>>>>>>>>>>> El dom, 26 ene 2025 a las 20:58, Asif Shahid (<
>>>>>>>>>>>>> asif.sha...@gmail.com>) escribió:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> On further thoughts, I concur that leaf expressions like
>>>>>>>>>>>>>> AttributeRefs can always be considered to be  deterministic, as 
>>>>>>>>>>>>>> , as a java
>>>>>>>>>>>>>> variable the value contained in it per iteration is invariant ( 
>>>>>>>>>>>>>> except when
>>>>>>>>>>>>>> changed by some deterministic logic). So in that sense what I 
>>>>>>>>>>>>>> said in the
>>>>>>>>>>>>>> above mail as that an issue is incorrect.
>>>>>>>>>>>>>> But I think that AttributeRef should have a boolean method
>>>>>>>>>>>>>> which tells, whether the value it represents is from an 
>>>>>>>>>>>>>> indeterminate
>>>>>>>>>>>>>> source or not.
>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>> Asif
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Jan 24, 2025 at 5:18 PM Asif Shahid <
>>>>>>>>>>>>>> asif.sha...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>> While testing a use case where the query had an outer join
>>>>>>>>>>>>>>> such that joining key of left outer table either had a valid 
>>>>>>>>>>>>>>> value or a
>>>>>>>>>>>>>>> random value( salting to avoid skew).
>>>>>>>>>>>>>>> The case was reported to have incorrect results in case of
>>>>>>>>>>>>>>> node failure, with retry.
>>>>>>>>>>>>>>> On debugging the code, have found following, which has left
>>>>>>>>>>>>>>> me confused as to what is spark's strategy for indeterministic 
>>>>>>>>>>>>>>> fields.
>>>>>>>>>>>>>>> Some serious issues are :
>>>>>>>>>>>>>>> 1) All the leaf expressions, like AttributeReference  are
>>>>>>>>>>>>>>> always considered deterministic. Which means if an attribute is 
>>>>>>>>>>>>>>> pointing to
>>>>>>>>>>>>>>> an Alias which itself is indeterministic,  the attribute will 
>>>>>>>>>>>>>>> still be
>>>>>>>>>>>>>>> considered deterministic
>>>>>>>>>>>>>>> 2) In CheckAnalysis there is code which checks whether each
>>>>>>>>>>>>>>> Operator either supports indeterministic value or not . Join is 
>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>> included in the list of supported, but it passes even if the 
>>>>>>>>>>>>>>> joining key is
>>>>>>>>>>>>>>> pointing to an indeterministic alias. ( When I tried fixing it, 
>>>>>>>>>>>>>>> found a
>>>>>>>>>>>>>>> plethora of operators failing Like DeserializedObject, 
>>>>>>>>>>>>>>> LocalRelation etc
>>>>>>>>>>>>>>> which are not supposed to contain indeterministic attributes ( 
>>>>>>>>>>>>>>> because they
>>>>>>>>>>>>>>> are not in the list of supporting operators).
>>>>>>>>>>>>>>> 3) The ShuffleDependency does not check for indeterministic
>>>>>>>>>>>>>>> nature of partitioner ( fixed it locally and then realized that 
>>>>>>>>>>>>>>> there is
>>>>>>>>>>>>>>> the bug #1 which needs to be fixed too).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The code in DagScheduler / TaskSet, TaskScheduler etc, seems
>>>>>>>>>>>>>>> to have been written , keeping in mind the indeterministic 
>>>>>>>>>>>>>>> nature of the
>>>>>>>>>>>>>>> previous and current stages , so as to rexecute previous stages 
>>>>>>>>>>>>>>> as a whole,
>>>>>>>>>>>>>>> instead of just missing tasks, but the above  3 points, do not 
>>>>>>>>>>>>>>> seem to
>>>>>>>>>>>>>>> support the code of DagScheduler / TaskScheduler.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>> Asif
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>

Reply via email to