Hi. Ok . did the final checkin. Pls feel free to review. Regards Asif
On Sat, Feb 15, 2025 at 6:42 PM Asif Shahid <asif.sha...@gmail.com> wrote: > Pls hold on reviewing the patch, as I need to do one more checkin. > I have still left a window of race , by releasing the read lock early, for > the case of first task being successful, and the failing task has yet to > acquire the write lock. > Regards > Asif > > On Thu, Feb 13, 2025 at 10:48 PM Asif Shahid <asif.sha...@gmail.com> > wrote: > >> Hi Wenchen, >> Apologies... I thought it was Santosh asking for the PR. and race >> condition... >> Yes the race condition causes only some partitions to be retried .. >> May be a checksum can be used as fallback mechanism? >> I suppose checksum would be implemented on executor sides..? Then there >> might be condition where shuffle files could be lost before >> driver/executors are communicated checksum ? >> Regards >> Asif >> >> >> On Thu, Feb 13, 2025 at 7:39 PM Asif Shahid <asif.sha...@gmail.com> >> wrote: >> >>> The bugrepro patch , when applied on current master, will show failure >>> with incorrect results. >>> While on the PR branch , it will pass. >>> The number of iterations in the test is 100. >>> >>> Regards >>> Asif >>> >>> On Thu, Feb 13, 2025 at 7:35 PM Asif Shahid <asif.sha...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> Following up on this issue. >>>> The PR opened is: >>>> 1)https://github.com/apache/spark/pull/49708 >>>> The PR IMO fixes the 3 issues which I have described below. >>>> The race condition fix requires use of read write lock. >>>> >>>> 2) Jira is: >>>> https://issues.apache.org/jira/browse/SPARK-51016 >>>> >>>> A checksum approach would result in greater number of retries, I >>>> believe: >>>> There is already mechanism in Expression to identify indeterminism >>>> >>>> Anyways, the issues which I think are , multiple, which the opened PR >>>> addresses. >>>> 1) The stage.isIndeterminate is buggy as it is not able to identify >>>> inDeterminate, and the fix spans some files like ShuffleDependency, RDD >>>> etc. >>>> 2) The race condition in DagScheduler: >>>> a) Say a task1 corresponding to Partition P1 of an inDeterminate >>>> Stage Result fails with Fetch Failure >>>> b) The code rightly checks if no previous partitions of result stage >>>> are successful yet, instead of throwing Exception by aborting , it decides >>>> to retry. >>>> c) During this window of retry, to check for dependencies and >>>> making a new attempt initiated by Task1, Task2 corresponding to Partition 2 >>>> of result stage becomes successful. >>>> d) The new attempt, now sees instead of all missing partitions, sees >>>> n -1. and that creates Data corruption. as it is no longer retrying on >>>> all partitions. >>>> >>>> 3) The DagSchedulerSuite's test >>>> a) test("SPARK-25341: retry all the succeeding stages when the >>>> map stage is indeterminate") >>>> has following assertion: >>>> >>>> assert(failedStages.collect { >>>> case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == >>>> shuffleId2 => stage >>>> }.head.findMissingPartitions() == Seq(0)) >>>> >>>> If my understanding is right and as seen from the outcome of the opened >>>> PR, as the ShuffleMapSage is turning out to be inDeterminate, all tasks >>>> should be retried: >>>> so the assertion should be , as expected value Seq(0, 1) >>>> >>>> b) If the above #a is valid, then the second test >>>> test("SPARK-32923: handle stage failure for indeterminate map >>>> stage with push-based shuffle") { >>>> which has assertion: >>>> >>>> assert(failedStages.collect { >>>> case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == >>>> shuffleId2 => stage >>>> }.head.findMissingPartitions() == Seq(0)) >>>> >>>> should also be instead, Seq(0, 1) >>>> >>>> >>>> I am also attaching a patch which I was using a basis to debug and fix. >>>> The patch modifies the source code by using inline code interception >>>> and tweaks to reproduce the issue of data corruption using a unit test. >>>> >>>> And I am also attaching the BugTest which I used. >>>> >>>> The test is supposed to pass if either it encounters exception from >>>> spark layer, due to exception >>>> "Please eliminate the ndeterminacy by checkpointing the RDD before >>>> repartition and try again" >>>> or if the results of retry are as expected ( 12 rows along witrh data >>>> correctness check) >>>> >>>> Meanwhile I will try to productize the bug test. >>>> >>>> Would appreciate clarification on Test issue #3. And a review of PR, if >>>> done, would be appreciate much more. >>>> >>>> Sanotsh, >>>> I believe my PR would do a full partition retry ( if the Fetch failed >>>> happend prior to any other final task being successful, else it will abort >>>> query throwing Exception). No wrong result. I believe. >>>> Your review will be much appreciated. >>>> Regards >>>> Asif >>>> >>>> >>>> On Thu, Feb 13, 2025 at 4:54 PM Wenchen Fan <cloud0...@gmail.com> >>>> wrote: >>>> >>>>> > Have identified a race condition , in the current DagSchdeduler code >>>>> base >>>>> >>>>> Does it lead to the failure of full-stage retry? Can you also share >>>>> your PR so that we can take a closer look? >>>>> >>>>> BTW, I'm working with my colleagues on this runtime checksum idea. We >>>>> have something similar in our internal infra and I think we can create a >>>>> working solution shortly. >>>>> >>>>> On Thu, Feb 13, 2025 at 12:53 PM Santosh Pingale < >>>>> santosh.ping...@adyen.com> wrote: >>>>> >>>>>> > Maybe we should do it at runtime: if Spark retries a shuffle stage >>>>>> but the data becomes different (e.g. use checksum to check it), then >>>>>> Spark >>>>>> should retry all the partitions of this stage. >>>>>> >>>>>> Having bitten hard recently by this behavior in spark and after >>>>>> having gone down the rabbit hole to investigate, we should definitely try >>>>>> to refine and implement this approach. The non-deterministicness is >>>>>> dependent on several factors, including datatypes of the columns or use >>>>>> of >>>>>> repartition; it is not easy for the users the reason about this behavior. >>>>>> >>>>>> PS: The JIRA that I found and helped me confirm my understanding of >>>>>> the problem, plus educate internal users and platform team: >>>>>> https://issues.apache.org/jira/browse/SPARK-38388. Checksum approach >>>>>> was brought up in that JIRA too and I feel that is the balanced way to >>>>>> look >>>>>> at this problem. >>>>>> >>>>>> Kind regards >>>>>> Santosh >>>>>> >>>>>> On Thu, Feb 13, 2025, 12:41 AM Asif Shahid <asif.sha...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> Following up on the issue with some information: >>>>>>> 1) have opened a PR related to fix the isInDeterminate method in >>>>>>> Stage , RDD etc. >>>>>>> 2) After tweaking the source code , have been able to reproduce the >>>>>>> data loss issue reliably, but cannot productize the test, as it >>>>>>> requires a >>>>>>> lot of inline interception and coaxing to induce test failure in a >>>>>>> single >>>>>>> VM unit test. will post a diff of the source code patch and bug test. >>>>>>> 3) Have identified a race condition , in the current >>>>>>> DagSchdeduler code base: >>>>>>> In the current DagScheduler code, function >>>>>>> *handleTaskCompletion, * when a task fails, while resubmitting the >>>>>>> failure as ResubmitFailedStage, prior to submitting, it checks if the >>>>>>> shuffle is inDeterminate. If it is , then stage is submitted ONLY IF >>>>>>> the >>>>>>> ResultStage has ZERO partitions processed. Which is right, But it is >>>>>>> not a >>>>>>> atomic operation till the point , where missingPartitions from stage is >>>>>>> invoked. As a result it is possible that one of the task from the >>>>>>> current >>>>>>> attempt, marks one of the partition as successful, and while the new >>>>>>> attempt is made, instead of getting missing partitions as total >>>>>>> partitions, it fetches only some of them. >>>>>>> Will be modifying the PR to fix the race. >>>>>>> Regards >>>>>>> Asif >>>>>>> >>>>>>> On Sun, Jan 26, 2025 at 11:19 PM Asif Shahid <asif.sha...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Shouldn't it be possible to determine with static data , if output >>>>>>>> will be deterministic ?. Expressions already have deterministic flag. >>>>>>>> So >>>>>>>> when an attribute is created from alias, it will be possible to know if >>>>>>>> attribute is pointing to an inDeterminate component. >>>>>>>> >>>>>>>> On Sun, Jan 26, 2025, 11:09 PM Wenchen Fan <cloud0...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> It looks like a hard problem to statically analyze the query plan >>>>>>>>> and decide whether a Spark stage is deterministic or not. When I added >>>>>>>>> RDD DeterministicLevel, I thought it was not a hard problem for the >>>>>>>>> callers >>>>>>>>> to specify it, but seems I was wrong. >>>>>>>>> >>>>>>>>> Maybe we should do it at runtime: if Spark retries a shuffle stage >>>>>>>>> but the data becomes different (e.g. use checksum to check it), then >>>>>>>>> Spark >>>>>>>>> should retry all the partitions of this stage. I'll look into this >>>>>>>>> repro >>>>>>>>> after I'm back from the national holiday. >>>>>>>>> >>>>>>>>> On Mon, Jan 27, 2025 at 2:16 PM Asif Shahid <asif.sha...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> I am using below test. as a unit test though , it will pass, as >>>>>>>>>> to simulate executor lost in a single vm is difficult, but there is >>>>>>>>>> definitely a bug. >>>>>>>>>> Using debugger if you check, the ShuffleStage.isDeterminate is >>>>>>>>>> turning out to be true, though it clearly should not be. >>>>>>>>>> As result if you look at DagScheduler, TaskSchedulerImpl and >>>>>>>>>> TaskSet code, it will not retry the ShuffleStage fully, instead >>>>>>>>>> would only >>>>>>>>>> retry missing task. >>>>>>>>>> Which means on retry if the join used a random value which puts >>>>>>>>>> it in already completed shuffle task, the results will be missed. >>>>>>>>>> >>>>>>>>>>> package org.apache.spark.sql.vectorized >>>>>>>>>>> >>>>>>>>>>> import org.apache.spark.rdd.ZippedPartitionsRDD2 >>>>>>>>>>> import org.apache.spark.sql.{DataFrame, Encoders, QueryTest} >>>>>>>>>>> import org.apache.spark.sql.catalyst.expressions.Literal >>>>>>>>>>> import org.apache.spark.sql.execution.datasources.FileScanRDD >>>>>>>>>>> import org.apache.spark.sql.functions._ >>>>>>>>>>> import org.apache.spark.sql.internal.SQLConf >>>>>>>>>>> import org.apache.spark.sql.test.SharedSparkSession >>>>>>>>>>> import org.apache.spark.sql.types.LongType >>>>>>>>>>> >>>>>>>>>>> class BugTest extends QueryTest with SharedSparkSession { >>>>>>>>>>> import testImplicits._ >>>>>>>>>>> /* test("no retries") { >>>>>>>>>>> withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { >>>>>>>>>>> val baseDf = spark.createDataset( >>>>>>>>>>> Seq((1L, "a"), (2L, "b"), (3L, "c"), (null, "w"), (null, >>>>>>>>>>> "x"), (null, "y"), (null, "z")))( >>>>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft") >>>>>>>>>>> >>>>>>>>>>> val leftOuter = baseDf.select( >>>>>>>>>>> $"strleft", when(isnull($"pkLeft"), >>>>>>>>>>> monotonically_increasing_id() + Literal(100)). >>>>>>>>>>> otherwise($"pkLeft").as("pkLeft")) >>>>>>>>>>> leftOuter.show(10000) >>>>>>>>>>> >>>>>>>>>>> val innerRight = spark.createDataset( >>>>>>>>>>> Seq((1L, "11"), (2L, "22"), (3L, "33")))( >>>>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright") >>>>>>>>>>> >>>>>>>>>>> val innerjoin = leftOuter.join(innerRight, $"pkLeft" === >>>>>>>>>>> $"pkRight", "inner") >>>>>>>>>>> >>>>>>>>>>> innerjoin.show(1000) >>>>>>>>>>> >>>>>>>>>>> val outerjoin = leftOuter.join(innerRight, $"pkLeft" === >>>>>>>>>>> $"pkRight", "left_outer") >>>>>>>>>>> >>>>>>>>>>> outerjoin.show(1000) >>>>>>>>>>> } >>>>>>>>>>> } */ >>>>>>>>>>> >>>>>>>>>>> test("with retries") { >>>>>>>>>>> withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { >>>>>>>>>>> withTable("outer", "inner") { >>>>>>>>>>> createBaseTables() >>>>>>>>>>> val outerjoin: DataFrame = getOuterJoinDF >>>>>>>>>>> >>>>>>>>>>> println("Initial data") >>>>>>>>>>> outerjoin.show(1000) >>>>>>>>>>> val correctRows = outerjoin.collect() >>>>>>>>>>> for( i <- 0 until 100) { >>>>>>>>>>> FileScanRDD.throwException = false >>>>>>>>>>> ZippedPartitionsRDD2.throwException = true >>>>>>>>>>> val rowsAfterRetry = getOuterJoinDF.collect() >>>>>>>>>>> import scala.jdk.CollectionConverters._ >>>>>>>>>>> val temp = >>>>>>>>>>> spark.createDataFrame(rowsAfterRetry.toSeq.asJava, outerjoin.schema) >>>>>>>>>>> println("after retry data") >>>>>>>>>>> temp.show(1000) >>>>>>>>>>> assert(correctRows.length == rowsAfterRetry.length) >>>>>>>>>>> val retriedResults = rowsAfterRetry.toBuffer >>>>>>>>>>> correctRows.foreach(r => { >>>>>>>>>>> val index = retriedResults.indexWhere(x => >>>>>>>>>>> r.getString(0) == x.getString(0) && >>>>>>>>>>> (r.getLong(1) == x.getLong(1) || (x.isNullAt(2) && >>>>>>>>>>> r.isNullAt(2) && >>>>>>>>>>> x.isNullAt(3) && r.isNullAt(3))) && >>>>>>>>>>> ((r.isNullAt(2) && x.isNullAt(2)) || r.getLong(2) >>>>>>>>>>> == x.getLong(2)) && >>>>>>>>>>> ((r.isNullAt(3) && x.isNullAt(3)) || r.getString(3) >>>>>>>>>>> == x.getString(3))) >>>>>>>>>>> assert(index >= 0) >>>>>>>>>>> retriedResults.remove(index) >>>>>>>>>>> } >>>>>>>>>>> ) >>>>>>>>>>> assert(retriedResults.isEmpty) >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> // Thread.sleep(10000000) >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> private def createBaseTables(): Unit = { >>>>>>>>>>> /*val outerDf = spark.createDataset( >>>>>>>>>>> Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, >>>>>>>>>>> "cc"), (null, "cc")))( >>>>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft") >>>>>>>>>>> outerDf.write.format("parquet").saveAsTable("outer")*/ >>>>>>>>>>> >>>>>>>>>>> val outerDf = spark.createDataset( >>>>>>>>>>> Seq((1L, "aa"), (null, "aa"), (2L, "aa"), (null, "bb"), (3L, >>>>>>>>>>> "bb"), (null, "bb")))( >>>>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>>>> Encoders.STRING)).toDF("pkLeftt", "strleft") >>>>>>>>>>> >>>>>>>>>>> outerDf.write.format("parquet").partitionBy("strleft").saveAsTable("outer") >>>>>>>>>>> >>>>>>>>>>> /*val innerDf = spark.createDataset( >>>>>>>>>>> Seq((1L, "11"), (2L, "22"), (3L, "33")))( >>>>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")*/ >>>>>>>>>>> >>>>>>>>>>> val innerDf = spark.createDataset( >>>>>>>>>>> Seq((1L, "11"), (2L, "11"), (3L, "33")))( >>>>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright") >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> innerDf.write.format("parquet").partitionBy("strright").saveAsTable("inner") >>>>>>>>>>> >>>>>>>>>>> val innerInnerDf = spark.createDataset( >>>>>>>>>>> Seq((1L, "111"), (2L, "222"), (3L, "333")))( >>>>>>>>>>> Encoders.tupleEncoder(Encoders.LONG, >>>>>>>>>>> Encoders.STRING)).toDF("pkpkRight", "strstrright") >>>>>>>>>>> >>>>>>>>>>> // innerInnerDf.write.format("parquet").saveAsTable("innerinner") >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> private def getOuterJoinDF = { >>>>>>>>>>> val leftOuter = spark.table("outer").select( >>>>>>>>>>> $"strleft", when(isnull($"pkLeftt"), floor(rand() * >>>>>>>>>>> Literal(10000000L)). >>>>>>>>>>> cast(LongType)). >>>>>>>>>>> otherwise($"pkLeftt").as("pkLeft")) >>>>>>>>>>> >>>>>>>>>>> val innerRight = spark.table("inner") >>>>>>>>>>> // val innerinnerRight = spark.table("innerinner") >>>>>>>>>>> >>>>>>>>>>> val outerjoin = leftOuter.hint("shuffle_hash"). >>>>>>>>>>> join(innerRight, $"pkLeft" === $"pkRight", "left_outer") >>>>>>>>>>> outerjoin >>>>>>>>>>> /* >>>>>>>>>>> val outerOuterJoin = outerjoin.hint("shuffle_hash"). >>>>>>>>>>> join(innerinnerRight, $"pkLeft" === $"pkpkRight", >>>>>>>>>>> "left_outer") >>>>>>>>>>> outerOuterJoin >>>>>>>>>>> >>>>>>>>>>> */ >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> On Sun, Jan 26, 2025 at 10:05 PM Asif Shahid < >>>>>>>>>> asif.sha...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Sure. I will send prototypical query tomorrow. Though its >>>>>>>>>>> difficult to simulate issue using unit test , but I think the issue >>>>>>>>>>> is >>>>>>>>>>> Rdd.isIndeterminate is not returning true for the query. As a >>>>>>>>>>> result, on retry, the shuffle stage is not reattempted fully. >>>>>>>>>>> And rdd is not returning inDeterminate as true , is due to >>>>>>>>>>> ShuffleRdd not taking into account , as the ShuffleDependency is >>>>>>>>>>> not taking >>>>>>>>>>> into account inDeterminate nature of HashPartitioner. >>>>>>>>>>> Moreover the attribute ref provided to HashPartitioner though is >>>>>>>>>>> pointing to an inDeterminate alias, is having deterministic flag as >>>>>>>>>>> true ( >>>>>>>>>>> which in itself is logically correct). >>>>>>>>>>> So I feel apart from ShuffleDependency code change, all >>>>>>>>>>> expressions should have a lazy Boolean say containsIndeterministic >>>>>>>>>>> component. Which will be true if the expression is indeterministic >>>>>>>>>>> or >>>>>>>>>>> contains any attribute ref which has is indeterministicComponent >>>>>>>>>>> true. >>>>>>>>>>> >>>>>>>>>>> And on personal note.. thanks for your interest.. this is very >>>>>>>>>>> rare attitude. >>>>>>>>>>> Regards >>>>>>>>>>> Asif >>>>>>>>>>> >>>>>>>>>>> On Sun, Jan 26, 2025, 9:45 PM Ángel < >>>>>>>>>>> angel.alvarez.pas...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Asif, >>>>>>>>>>>> >>>>>>>>>>>> Could you provide an example (code+dataset) to analize this? >>>>>>>>>>>> Looks interesting ... >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Ángel >>>>>>>>>>>> >>>>>>>>>>>> El dom, 26 ene 2025 a las 20:58, Asif Shahid (< >>>>>>>>>>>> asif.sha...@gmail.com>) escribió: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> On further thoughts, I concur that leaf expressions like >>>>>>>>>>>>> AttributeRefs can always be considered to be deterministic, as , >>>>>>>>>>>>> as a java >>>>>>>>>>>>> variable the value contained in it per iteration is invariant ( >>>>>>>>>>>>> except when >>>>>>>>>>>>> changed by some deterministic logic). So in that sense what I >>>>>>>>>>>>> said in the >>>>>>>>>>>>> above mail as that an issue is incorrect. >>>>>>>>>>>>> But I think that AttributeRef should have a boolean method >>>>>>>>>>>>> which tells, whether the value it represents is from an >>>>>>>>>>>>> indeterminate >>>>>>>>>>>>> source or not. >>>>>>>>>>>>> Regards >>>>>>>>>>>>> Asif >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Jan 24, 2025 at 5:18 PM Asif Shahid < >>>>>>>>>>>>> asif.sha...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> While testing a use case where the query had an outer join >>>>>>>>>>>>>> such that joining key of left outer table either had a valid >>>>>>>>>>>>>> value or a >>>>>>>>>>>>>> random value( salting to avoid skew). >>>>>>>>>>>>>> The case was reported to have incorrect results in case of >>>>>>>>>>>>>> node failure, with retry. >>>>>>>>>>>>>> On debugging the code, have found following, which has left >>>>>>>>>>>>>> me confused as to what is spark's strategy for indeterministic >>>>>>>>>>>>>> fields. >>>>>>>>>>>>>> Some serious issues are : >>>>>>>>>>>>>> 1) All the leaf expressions, like AttributeReference are >>>>>>>>>>>>>> always considered deterministic. Which means if an attribute is >>>>>>>>>>>>>> pointing to >>>>>>>>>>>>>> an Alias which itself is indeterministic, the attribute will >>>>>>>>>>>>>> still be >>>>>>>>>>>>>> considered deterministic >>>>>>>>>>>>>> 2) In CheckAnalysis there is code which checks whether each >>>>>>>>>>>>>> Operator either supports indeterministic value or not . Join is >>>>>>>>>>>>>> not >>>>>>>>>>>>>> included in the list of supported, but it passes even if the >>>>>>>>>>>>>> joining key is >>>>>>>>>>>>>> pointing to an indeterministic alias. ( When I tried fixing it, >>>>>>>>>>>>>> found a >>>>>>>>>>>>>> plethora of operators failing Like DeserializedObject, >>>>>>>>>>>>>> LocalRelation etc >>>>>>>>>>>>>> which are not supposed to contain indeterministic attributes ( >>>>>>>>>>>>>> because they >>>>>>>>>>>>>> are not in the list of supporting operators). >>>>>>>>>>>>>> 3) The ShuffleDependency does not check for indeterministic >>>>>>>>>>>>>> nature of partitioner ( fixed it locally and then realized that >>>>>>>>>>>>>> there is >>>>>>>>>>>>>> the bug #1 which needs to be fixed too). >>>>>>>>>>>>>> >>>>>>>>>>>>>> The code in DagScheduler / TaskSet, TaskScheduler etc, seems >>>>>>>>>>>>>> to have been written , keeping in mind the indeterministic >>>>>>>>>>>>>> nature of the >>>>>>>>>>>>>> previous and current stages , so as to rexecute previous stages >>>>>>>>>>>>>> as a whole, >>>>>>>>>>>>>> instead of just missing tasks, but the above 3 points, do not >>>>>>>>>>>>>> seem to >>>>>>>>>>>>>> support the code of DagScheduler / TaskScheduler. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards >>>>>>>>>>>>>> Asif >>>>>>>>>>>>>> >>>>>>>>>>>>>>