Checksum check as a fallback could indeed make sense. Good to have a catch all way here.
> BTW, I'm working with my colleagues on this runtime checksum idea. We have something similar in our internal infra and I think we can create a working solution shortly @Wenchen Fan <cloud0...@gmail.com> is there any SPIP/JIRA ticket/PR for work you are doing with your colleague? On Sun, Feb 16, 2025, 6:43 PM Asif Shahid <asif.sha...@gmail.com> wrote: > Hi. > Ok . did the final checkin. Pls feel free to review. > Regards > Asif > > > On Sat, Feb 15, 2025 at 6:42 PM Asif Shahid <asif.sha...@gmail.com> wrote: > >> Pls hold on reviewing the patch, as I need to do one more checkin. >> I have still left a window of race , by releasing the read lock early, >> for the case of first task being successful, and the failing task has yet >> to acquire the write lock. >> Regards >> Asif >> >> On Thu, Feb 13, 2025 at 10:48 PM Asif Shahid <asif.sha...@gmail.com> >> wrote: >> >>> Hi Wenchen, >>> Apologies... I thought it was Santosh asking for the PR. and race >>> condition... >>> Yes the race condition causes only some partitions to be retried .. >>> May be a checksum can be used as fallback mechanism? >>> I suppose checksum would be implemented on executor sides..? Then there >>> might be condition where shuffle files could be lost before >>> driver/executors are communicated checksum ? >>> Regards >>> Asif >>> >>> >>> On Thu, Feb 13, 2025 at 7:39 PM Asif Shahid <asif.sha...@gmail.com> >>> wrote: >>> >>>> The bugrepro patch , when applied on current master, will show failure >>>> with incorrect results. >>>> While on the PR branch , it will pass. >>>> The number of iterations in the test is 100. >>>> >>>> Regards >>>> Asif >>>> >>>> On Thu, Feb 13, 2025 at 7:35 PM Asif Shahid <asif.sha...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> Following up on this issue. >>>>> The PR opened is: >>>>> 1)https://github.com/apache/spark/pull/49708 >>>>> The PR IMO fixes the 3 issues which I have described below. >>>>> The race condition fix requires use of read write lock. >>>>> >>>>> 2) Jira is: >>>>> https://issues.apache.org/jira/browse/SPARK-51016 >>>>> >>>>> A checksum approach would result in greater number of retries, I >>>>> believe: >>>>> There is already mechanism in Expression to identify indeterminism >>>>> >>>>> Anyways, the issues which I think are , multiple, which the opened PR >>>>> addresses. >>>>> 1) The stage.isIndeterminate is buggy as it is not able to identify >>>>> inDeterminate, and the fix spans some files like ShuffleDependency, RDD >>>>> etc. >>>>> 2) The race condition in DagScheduler: >>>>> a) Say a task1 corresponding to Partition P1 of an inDeterminate >>>>> Stage Result fails with Fetch Failure >>>>> b) The code rightly checks if no previous partitions of result >>>>> stage are successful yet, instead of throwing Exception by aborting , it >>>>> decides to retry. >>>>> c) During this window of retry, to check for dependencies and >>>>> making a new attempt initiated by Task1, Task2 corresponding to Partition >>>>> 2 >>>>> of result stage becomes successful. >>>>> d) The new attempt, now sees instead of all missing partitions, >>>>> sees n -1. and that creates Data corruption. as it is no longer >>>>> retrying >>>>> on all partitions. >>>>> >>>>> 3) The DagSchedulerSuite's test >>>>> a) test("SPARK-25341: retry all the succeeding stages when the >>>>> map stage is indeterminate") >>>>> has following assertion: >>>>> >>>>> assert(failedStages.collect { >>>>> case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == >>>>> shuffleId2 => stage >>>>> }.head.findMissingPartitions() == Seq(0)) >>>>> >>>>> If my understanding is right and as seen from the outcome of the >>>>> opened PR, as the ShuffleMapSage is turning out to be inDeterminate, all >>>>> tasks should be retried: >>>>> so the assertion should be , as expected value Seq(0, 1) >>>>> >>>>> b) If the above #a is valid, then the second test >>>>> test("SPARK-32923: handle stage failure for indeterminate map >>>>> stage with push-based shuffle") { >>>>> which has assertion: >>>>> >>>>> assert(failedStages.collect { >>>>> case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == >>>>> shuffleId2 => stage >>>>> }.head.findMissingPartitions() == Seq(0)) >>>>> >>>>> should also be instead, Seq(0, 1) >>>>> >>>>> >>>>> I am also attaching a patch which I was using a basis to debug and fix. >>>>> The patch modifies the source code by using inline code interception >>>>> and tweaks to reproduce the issue of data corruption using a unit test. >>>>> >>>>> And I am also attaching the BugTest which I used. >>>>> >>>>> The test is supposed to pass if either it encounters exception from >>>>> spark layer, due to exception >>>>> "Please eliminate the ndeterminacy by checkpointing the RDD before >>>>> repartition and try again" >>>>> or if the results of retry are as expected ( 12 rows along witrh data >>>>> correctness check) >>>>> >>>>> Meanwhile I will try to productize the bug test. >>>>> >>>>> Would appreciate clarification on Test issue #3. And a review of PR, >>>>> if done, would be appreciate much more. >>>>> >>>>> Sanotsh, >>>>> I believe my PR would do a full partition retry ( if the Fetch failed >>>>> happend prior to any other final task being successful, else it will abort >>>>> query throwing Exception). No wrong result. I believe. >>>>> Your review will be much appreciated. >>>>> Regards >>>>> Asif >>>>> >>>>> >>>>> On Thu, Feb 13, 2025 at 4:54 PM Wenchen Fan <cloud0...@gmail.com> >>>>> wrote: >>>>> >>>>>> > Have identified a race condition , in the current >>>>>> DagSchdeduler code base >>>>>> >>>>>> Does it lead to the failure of full-stage retry? Can you also share >>>>>> your PR so that we can take a closer look? >>>>>> >>>>>> BTW, I'm working with my colleagues on this runtime checksum idea. We >>>>>> have something similar in our internal infra and I think we can create a >>>>>> working solution shortly. >>>>>> >>>>>> On Thu, Feb 13, 2025 at 12:53 PM Santosh Pingale < >>>>>> santosh.ping...@adyen.com> wrote: >>>>>> >>>>>>> > Maybe we should do it at runtime: if Spark retries a shuffle stage >>>>>>> but the data becomes different (e.g. use checksum to check it), then >>>>>>> Spark >>>>>>> should retry all the partitions of this stage. >>>>>>> >>>>>>> Having bitten hard recently by this behavior in spark and after >>>>>>> having gone down the rabbit hole to investigate, we should definitely >>>>>>> try >>>>>>> to refine and implement this approach. The non-deterministicness is >>>>>>> dependent on several factors, including datatypes of the columns or use >>>>>>> of >>>>>>> repartition; it is not easy for the users the reason about this >>>>>>> behavior. >>>>>>> >>>>>>> PS: The JIRA that I found and helped me confirm my understanding of >>>>>>> the problem, plus educate internal users and platform team: >>>>>>> https://issues.apache.org/jira/browse/SPARK-38388. Checksum >>>>>>> approach was brought up in that JIRA too and I feel that is the balanced >>>>>>> way to look at this problem. >>>>>>> >>>>>>> Kind regards >>>>>>> Santosh >>>>>>> >>>>>>> On Thu, Feb 13, 2025, 12:41 AM Asif Shahid <asif.sha...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> Following up on the issue with some information: >>>>>>>> 1) have opened a PR related to fix the isInDeterminate method in >>>>>>>> Stage , RDD etc. >>>>>>>> 2) After tweaking the source code , have been able to reproduce the >>>>>>>> data loss issue reliably, but cannot productize the test, as it >>>>>>>> requires a >>>>>>>> lot of inline interception and coaxing to induce test failure in a >>>>>>>> single >>>>>>>> VM unit test. will post a diff of the source code patch and bug test. >>>>>>>> 3) Have identified a race condition , in the current >>>>>>>> DagSchdeduler code base: >>>>>>>> In the current DagScheduler code, function >>>>>>>> *handleTaskCompletion, * when a task fails, while resubmitting the >>>>>>>> failure as ResubmitFailedStage, prior to submitting, it checks if the >>>>>>>> shuffle is inDeterminate. If it is , then stage is submitted ONLY IF >>>>>>>> the >>>>>>>> ResultStage has ZERO partitions processed. Which is right, But it is >>>>>>>> not a >>>>>>>> atomic operation till the point , where missingPartitions from stage is >>>>>>>> invoked. As a result it is possible that one of the task from the >>>>>>>> current >>>>>>>> attempt, marks one of the partition as successful, and while the new >>>>>>>> attempt is made, instead of getting missing partitions as total >>>>>>>> partitions, it fetches only some of them. >>>>>>>> Will be modifying the PR to fix the race. >>>>>>>> Regards >>>>>>>> Asif >>>>>>>> >>>>>>>> On Sun, Jan 26, 2025 at 11:19 PM Asif Shahid <asif.sha...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Shouldn't it be possible to determine with static data , if output >>>>>>>>> will be deterministic ?. Expressions already have deterministic flag. >>>>>>>>> So >>>>>>>>> when an attribute is created from alias, it will be possible to know >>>>>>>>> if >>>>>>>>> attribute is pointing to an inDeterminate component. >>>>>>>>> >>>>>>>>> On Sun, Jan 26, 2025, 11:09 PM Wenchen Fan <cloud0...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> It looks like a hard problem to statically analyze the query plan >>>>>>>>>> and decide whether a Spark stage is deterministic or not. When I >>>>>>>>>> added >>>>>>>>>> RDD DeterministicLevel, I thought it was not a hard problem for the >>>>>>>>>> callers >>>>>>>>>> to specify it, but seems I was wrong. >>>>>>>>>> >>>>>>>>>> Maybe we should do it at runtime: if Spark retries a shuffle >>>>>>>>>> stage but the data becomes different (e.g. use checksum to check >>>>>>>>>> it), then >>>>>>>>>> Spark should retry all the partitions of this stage. I'll look into >>>>>>>>>> this >>>>>>>>>> repro after I'm back from the national holiday. >>>>>>>>>> >>>>>>>>>> On Mon, Jan 27, 2025 at 2:16 PM Asif Shahid < >>>>>>>>>> asif.sha...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I am using below test. as a unit test though , it will pass, >>>>>>>>>>> as to simulate executor lost in a single vm is difficult, but >>>>>>>>>>> there is >>>>>>>>>>> definitely a bug. >>>>>>>>>>> Using debugger if you check, the ShuffleStage.isDeterminate is >>>>>>>>>>> turning out to be true, though it clearly should not be. >>>>>>>>>>> As result if you look at DagScheduler, TaskSchedulerImpl and >>>>>>>>>>> TaskSet code, it will not retry the ShuffleStage fully, instead >>>>>>>>>>> would only >>>>>>>>>>> retry missing task. >>>>>>>>>>> Which means on retry if the join used a random value which puts >>>>>>>>>>> it in already completed shuffle task, the results will be missed. >>>>>>>>>>> >>>>>>>>>>>> package org.apache.spark.sql.vectorized >>>>>>>>>>>> >>>>>>>>>>>> import org.apache.spark.rdd.ZippedPartitionsRDD2 >>>>>>>>>>>> import org.apache.spark.sql.{DataFrame, Encoders, QueryTest} >>>>>>>>>>>> import org.apache.spark.sql.catalyst.expressions.Literal >>>>>>>>>>>> import org.apache.spark.sql.execution.datasources.FileScanRDD >>>>>>>>>>>> import org.apache.spark.sql.functions._ >>>>>>>>>>>> import org.apache.spark.sql.internal.SQLConf >>>>>>>>>>>> import org.apache.spark.sql.test.SharedSparkSession >>>>>>>>>>>> import org.apache.spark.sql.types.LongType >>>>>>>>>>>> >>>>>>>>>>>> class BugTest extends QueryTest with SharedSparkSession { >>>>>>>>>>>> import testImplicits._ >>>>>>>>>>>> /* test("no retries") { >>>>>>>>>>>> withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") >>>>>>>>>>>> { >>>>>>>>>>>> val baseDf = spark.createDataset( >>>>>>>>>>>> Seq((1L, "a"), (2L, "b"), (3L, "c"), (null, "w"), (null, >>>>>>>>>>>> "x"), (null, "y"), (null, "z")))( >>>>>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft") >>>>>>>>>>>> >>>>>>>>>>>> val leftOuter = baseDf.select( >>>>>>>>>>>> $"strleft", when(isnull($"pkLeft"), >>>>>>>>>>>> monotonically_increasing_id() + Literal(100)). >>>>>>>>>>>> otherwise($"pkLeft").as("pkLeft")) >>>>>>>>>>>> leftOuter.show(10000) >>>>>>>>>>>> >>>>>>>>>>>> val innerRight = spark.createDataset( >>>>>>>>>>>> Seq((1L, "11"), (2L, "22"), (3L, "33")))( >>>>>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright") >>>>>>>>>>>> >>>>>>>>>>>> val innerjoin = leftOuter.join(innerRight, $"pkLeft" === >>>>>>>>>>>> $"pkRight", "inner") >>>>>>>>>>>> >>>>>>>>>>>> innerjoin.show(1000) >>>>>>>>>>>> >>>>>>>>>>>> val outerjoin = leftOuter.join(innerRight, $"pkLeft" === >>>>>>>>>>>> $"pkRight", "left_outer") >>>>>>>>>>>> >>>>>>>>>>>> outerjoin.show(1000) >>>>>>>>>>>> } >>>>>>>>>>>> } */ >>>>>>>>>>>> >>>>>>>>>>>> test("with retries") { >>>>>>>>>>>> withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") >>>>>>>>>>>> { >>>>>>>>>>>> withTable("outer", "inner") { >>>>>>>>>>>> createBaseTables() >>>>>>>>>>>> val outerjoin: DataFrame = getOuterJoinDF >>>>>>>>>>>> >>>>>>>>>>>> println("Initial data") >>>>>>>>>>>> outerjoin.show(1000) >>>>>>>>>>>> val correctRows = outerjoin.collect() >>>>>>>>>>>> for( i <- 0 until 100) { >>>>>>>>>>>> FileScanRDD.throwException = false >>>>>>>>>>>> ZippedPartitionsRDD2.throwException = true >>>>>>>>>>>> val rowsAfterRetry = getOuterJoinDF.collect() >>>>>>>>>>>> import scala.jdk.CollectionConverters._ >>>>>>>>>>>> val temp = >>>>>>>>>>>> spark.createDataFrame(rowsAfterRetry.toSeq.asJava, >>>>>>>>>>>> outerjoin.schema) >>>>>>>>>>>> println("after retry data") >>>>>>>>>>>> temp.show(1000) >>>>>>>>>>>> assert(correctRows.length == rowsAfterRetry.length) >>>>>>>>>>>> val retriedResults = rowsAfterRetry.toBuffer >>>>>>>>>>>> correctRows.foreach(r => { >>>>>>>>>>>> val index = retriedResults.indexWhere(x => >>>>>>>>>>>> r.getString(0) == x.getString(0) && >>>>>>>>>>>> (r.getLong(1) == x.getLong(1) || (x.isNullAt(2) && >>>>>>>>>>>> r.isNullAt(2) && >>>>>>>>>>>> x.isNullAt(3) && r.isNullAt(3))) && >>>>>>>>>>>> ((r.isNullAt(2) && x.isNullAt(2)) || r.getLong(2) >>>>>>>>>>>> == x.getLong(2)) && >>>>>>>>>>>> ((r.isNullAt(3) && x.isNullAt(3)) || >>>>>>>>>>>> r.getString(3) == x.getString(3))) >>>>>>>>>>>> assert(index >= 0) >>>>>>>>>>>> retriedResults.remove(index) >>>>>>>>>>>> } >>>>>>>>>>>> ) >>>>>>>>>>>> assert(retriedResults.isEmpty) >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> // Thread.sleep(10000000) >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> private def createBaseTables(): Unit = { >>>>>>>>>>>> /*val outerDf = spark.createDataset( >>>>>>>>>>>> Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, >>>>>>>>>>>> "cc"), (null, "cc")))( >>>>>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft") >>>>>>>>>>>> outerDf.write.format("parquet").saveAsTable("outer")*/ >>>>>>>>>>>> >>>>>>>>>>>> val outerDf = spark.createDataset( >>>>>>>>>>>> Seq((1L, "aa"), (null, "aa"), (2L, "aa"), (null, "bb"), (3L, >>>>>>>>>>>> "bb"), (null, "bb")))( >>>>>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>>>>> Encoders.STRING)).toDF("pkLeftt", "strleft") >>>>>>>>>>>> >>>>>>>>>>>> outerDf.write.format("parquet").partitionBy("strleft").saveAsTable("outer") >>>>>>>>>>>> >>>>>>>>>>>> /*val innerDf = spark.createDataset( >>>>>>>>>>>> Seq((1L, "11"), (2L, "22"), (3L, "33")))( >>>>>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")*/ >>>>>>>>>>>> >>>>>>>>>>>> val innerDf = spark.createDataset( >>>>>>>>>>>> Seq((1L, "11"), (2L, "11"), (3L, "33")))( >>>>>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright") >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> innerDf.write.format("parquet").partitionBy("strright").saveAsTable("inner") >>>>>>>>>>>> >>>>>>>>>>>> val innerInnerDf = spark.createDataset( >>>>>>>>>>>> Seq((1L, "111"), (2L, "222"), (3L, "333")))( >>>>>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>>>>> Encoders.STRING)).toDF("pkpkRight", "strstrright") >>>>>>>>>>>> >>>>>>>>>>>> // >>>>>>>>>>>> innerInnerDf.write.format("parquet").saveAsTable("innerinner") >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> private def getOuterJoinDF = { >>>>>>>>>>>> val leftOuter = spark.table("outer").select( >>>>>>>>>>>> $"strleft", when(isnull($"pkLeftt"), floor(rand() * >>>>>>>>>>>> Literal(10000000L)). >>>>>>>>>>>> cast(LongType)). >>>>>>>>>>>> otherwise($"pkLeftt").as("pkLeft")) >>>>>>>>>>>> >>>>>>>>>>>> val innerRight = spark.table("inner") >>>>>>>>>>>> // val innerinnerRight = spark.table("innerinner") >>>>>>>>>>>> >>>>>>>>>>>> val outerjoin = leftOuter.hint("shuffle_hash"). >>>>>>>>>>>> join(innerRight, $"pkLeft" === $"pkRight", "left_outer") >>>>>>>>>>>> outerjoin >>>>>>>>>>>> /* >>>>>>>>>>>> val outerOuterJoin = outerjoin.hint("shuffle_hash"). >>>>>>>>>>>> join(innerinnerRight, $"pkLeft" === $"pkpkRight", >>>>>>>>>>>> "left_outer") >>>>>>>>>>>> outerOuterJoin >>>>>>>>>>>> >>>>>>>>>>>> */ >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> On Sun, Jan 26, 2025 at 10:05 PM Asif Shahid < >>>>>>>>>>> asif.sha...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Sure. I will send prototypical query tomorrow. Though its >>>>>>>>>>>> difficult to simulate issue using unit test , but I think the >>>>>>>>>>>> issue is >>>>>>>>>>>> Rdd.isIndeterminate is not returning true for the query. As a >>>>>>>>>>>> result, on retry, the shuffle stage is not reattempted fully. >>>>>>>>>>>> And rdd is not returning inDeterminate as true , is due to >>>>>>>>>>>> ShuffleRdd not taking into account , as the ShuffleDependency is >>>>>>>>>>>> not taking >>>>>>>>>>>> into account inDeterminate nature of HashPartitioner. >>>>>>>>>>>> Moreover the attribute ref provided to HashPartitioner though >>>>>>>>>>>> is pointing to an inDeterminate alias, is having deterministic >>>>>>>>>>>> flag as true >>>>>>>>>>>> ( which in itself is logically correct). >>>>>>>>>>>> So I feel apart from ShuffleDependency code change, all >>>>>>>>>>>> expressions should have a lazy Boolean say containsIndeterministic >>>>>>>>>>>> component. Which will be true if the expression is indeterministic >>>>>>>>>>>> or >>>>>>>>>>>> contains any attribute ref which has is indeterministicComponent >>>>>>>>>>>> true. >>>>>>>>>>>> >>>>>>>>>>>> And on personal note.. thanks for your interest.. this is very >>>>>>>>>>>> rare attitude. >>>>>>>>>>>> Regards >>>>>>>>>>>> Asif >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Jan 26, 2025, 9:45 PM Ángel < >>>>>>>>>>>> angel.alvarez.pas...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Asif, >>>>>>>>>>>>> >>>>>>>>>>>>> Could you provide an example (code+dataset) to analize this? >>>>>>>>>>>>> Looks interesting ... >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> Ángel >>>>>>>>>>>>> >>>>>>>>>>>>> El dom, 26 ene 2025 a las 20:58, Asif Shahid (< >>>>>>>>>>>>> asif.sha...@gmail.com>) escribió: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> On further thoughts, I concur that leaf expressions like >>>>>>>>>>>>>> AttributeRefs can always be considered to be deterministic, as >>>>>>>>>>>>>> , as a java >>>>>>>>>>>>>> variable the value contained in it per iteration is invariant ( >>>>>>>>>>>>>> except when >>>>>>>>>>>>>> changed by some deterministic logic). So in that sense what I >>>>>>>>>>>>>> said in the >>>>>>>>>>>>>> above mail as that an issue is incorrect. >>>>>>>>>>>>>> But I think that AttributeRef should have a boolean method >>>>>>>>>>>>>> which tells, whether the value it represents is from an >>>>>>>>>>>>>> indeterminate >>>>>>>>>>>>>> source or not. >>>>>>>>>>>>>> Regards >>>>>>>>>>>>>> Asif >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Jan 24, 2025 at 5:18 PM Asif Shahid < >>>>>>>>>>>>>> asif.sha...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>> While testing a use case where the query had an outer join >>>>>>>>>>>>>>> such that joining key of left outer table either had a valid >>>>>>>>>>>>>>> value or a >>>>>>>>>>>>>>> random value( salting to avoid skew). >>>>>>>>>>>>>>> The case was reported to have incorrect results in case of >>>>>>>>>>>>>>> node failure, with retry. >>>>>>>>>>>>>>> On debugging the code, have found following, which has left >>>>>>>>>>>>>>> me confused as to what is spark's strategy for indeterministic >>>>>>>>>>>>>>> fields. >>>>>>>>>>>>>>> Some serious issues are : >>>>>>>>>>>>>>> 1) All the leaf expressions, like AttributeReference are >>>>>>>>>>>>>>> always considered deterministic. Which means if an attribute is >>>>>>>>>>>>>>> pointing to >>>>>>>>>>>>>>> an Alias which itself is indeterministic, the attribute will >>>>>>>>>>>>>>> still be >>>>>>>>>>>>>>> considered deterministic >>>>>>>>>>>>>>> 2) In CheckAnalysis there is code which checks whether each >>>>>>>>>>>>>>> Operator either supports indeterministic value or not . Join is >>>>>>>>>>>>>>> not >>>>>>>>>>>>>>> included in the list of supported, but it passes even if the >>>>>>>>>>>>>>> joining key is >>>>>>>>>>>>>>> pointing to an indeterministic alias. ( When I tried fixing it, >>>>>>>>>>>>>>> found a >>>>>>>>>>>>>>> plethora of operators failing Like DeserializedObject, >>>>>>>>>>>>>>> LocalRelation etc >>>>>>>>>>>>>>> which are not supposed to contain indeterministic attributes ( >>>>>>>>>>>>>>> because they >>>>>>>>>>>>>>> are not in the list of supporting operators). >>>>>>>>>>>>>>> 3) The ShuffleDependency does not check for indeterministic >>>>>>>>>>>>>>> nature of partitioner ( fixed it locally and then realized that >>>>>>>>>>>>>>> there is >>>>>>>>>>>>>>> the bug #1 which needs to be fixed too). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The code in DagScheduler / TaskSet, TaskScheduler etc, seems >>>>>>>>>>>>>>> to have been written , keeping in mind the indeterministic >>>>>>>>>>>>>>> nature of the >>>>>>>>>>>>>>> previous and current stages , so as to rexecute previous stages >>>>>>>>>>>>>>> as a whole, >>>>>>>>>>>>>>> instead of just missing tasks, but the above 3 points, do not >>>>>>>>>>>>>>> seem to >>>>>>>>>>>>>>> support the code of DagScheduler / TaskScheduler. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regards >>>>>>>>>>>>>>> Asif >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>