Hi.
Ok . did the final checkin. Pls feel free to review.
Regards
Asif

On Sat, Feb 15, 2025 at 6:42 PM Asif Shahid <asif.sha...@gmail.com> wrote:

> Pls hold on reviewing the patch, as I need to do one more checkin.
> I have still left a window of race , by releasing the read lock early, for
> the  case of first task being successful,  and the failing task has yet to
> acquire the write lock.
> Regards
> Asif
>
> On Thu, Feb 13, 2025 at 10:48 PM Asif Shahid <asif.sha...@gmail.com>
> wrote:
>
>> Hi Wenchen,
>> Apologies... I thought it was Santosh asking for the PR. and race
>> condition...
>> Yes the race condition causes only some partitions to be retried ..
>> May be a checksum can be used as fallback mechanism?
>> I suppose checksum would be implemented on executor sides..? Then there
>> might be condition where shuffle files could be lost before
>> driver/executors are communicated checksum ?
>> Regards
>> Asif
>>
>>
>> On Thu, Feb 13, 2025 at 7:39 PM Asif Shahid <asif.sha...@gmail.com>
>> wrote:
>>
>>> The bugrepro patch , when applied on current master, will show failure
>>> with incorrect results.
>>> While on the PR branch , it will pass.
>>> The number of iterations in the test is 100.
>>>
>>> Regards
>>> Asif
>>>
>>> On Thu, Feb 13, 2025 at 7:35 PM Asif Shahid <asif.sha...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> Following up on this issue.
>>>> The PR opened is:
>>>> 1)https://github.com/apache/spark/pull/49708
>>>> The PR IMO fixes the 3 issues which I have described below.
>>>> The race condition fix requires use of read write lock.
>>>>
>>>> 2) Jira is:
>>>> https://issues.apache.org/jira/browse/SPARK-51016
>>>>
>>>> A checksum approach would result in greater number of retries, I
>>>> believe:
>>>> There is already mechanism in Expression to identify indeterminism
>>>>
>>>> Anyways, the issues which I think are , multiple, which the opened PR
>>>> addresses.
>>>> 1)  The stage.isIndeterminate is buggy as it is not able to identify
>>>> inDeterminate, and the fix  spans some files like ShuffleDependency, RDD
>>>> etc.
>>>> 2) The race condition in DagScheduler:
>>>>    a) Say a task1 corresponding to Partition P1 of an inDeterminate
>>>> Stage Result fails with Fetch Failure
>>>>    b) The code rightly checks if no previous partitions of result stage
>>>> are successful yet, instead of throwing  Exception by aborting , it decides
>>>> to retry.
>>>>     c) During this window of retry, to check for dependencies and
>>>> making a new attempt initiated by Task1, Task2 corresponding to Partition 2
>>>> of result stage becomes successful.
>>>>    d) The new attempt, now sees instead of all missing partitions, sees
>>>> n -1.  and  that creates  Data corruption. as it is no longer retrying on
>>>> all partitions.
>>>>
>>>> 3)  The DagSchedulerSuite's test
>>>>      a)    test("SPARK-25341: retry all the succeeding stages when the
>>>> map stage is indeterminate")
>>>>           has following assertion:
>>>>
>>>> assert(failedStages.collect {
>>>>       case stage: ShuffleMapStage if stage.shuffleDep.shuffleId ==
>>>> shuffleId2 => stage
>>>>     }.head.findMissingPartitions() == Seq(0))
>>>>
>>>> If my understanding is right and as seen from the outcome of the opened
>>>> PR, as the ShuffleMapSage is turning out to be inDeterminate, all tasks
>>>> should be retried:
>>>> so the assertion should be , as expected value Seq(0, 1)
>>>>
>>>> b) If  the above #a is valid, then the second test
>>>>       test("SPARK-32923: handle stage failure for indeterminate map
>>>> stage with push-based shuffle") {
>>>> which has assertion:
>>>>
>>>>          assert(failedStages.collect {
>>>>       case stage: ShuffleMapStage if stage.shuffleDep.shuffleId ==
>>>> shuffleId2 => stage
>>>>     }.head.findMissingPartitions() == Seq(0))
>>>>
>>>> should also be instead,  Seq(0, 1)
>>>>
>>>>
>>>> I am also attaching a patch which I was using a basis to debug and fix.
>>>> The patch modifies the source code by using inline code interception
>>>> and tweaks to reproduce the issue of data corruption using a unit test.
>>>>
>>>> And I am also attaching the BugTest which I used.
>>>>
>>>> The test is supposed to pass  if either it encounters exception from
>>>> spark layer,  due to exception
>>>> "Please eliminate the ndeterminacy by checkpointing the RDD before
>>>> repartition and try again"
>>>> or if the results of retry are as expected ( 12 rows along witrh data
>>>> correctness check)
>>>>
>>>> Meanwhile I will try to productize the bug test.
>>>>
>>>> Would appreciate clarification on Test issue #3. And a review of PR, if
>>>> done, would be appreciate much more.
>>>>
>>>> Sanotsh,
>>>> I believe my PR would do a full partition retry ( if the Fetch failed
>>>> happend prior to any other final task being successful, else it will abort
>>>> query throwing Exception). No wrong result. I believe.
>>>> Your review will be much appreciated.
>>>> Regards
>>>> Asif
>>>>
>>>>
>>>> On Thu, Feb 13, 2025 at 4:54 PM Wenchen Fan <cloud0...@gmail.com>
>>>> wrote:
>>>>
>>>>> > Have identified a race condition , in the current DagSchdeduler code
>>>>> base
>>>>>
>>>>> Does it lead to the failure of full-stage retry? Can you also share
>>>>> your PR so that we can take a closer look?
>>>>>
>>>>> BTW, I'm working with my colleagues on this runtime checksum idea. We
>>>>> have something similar in our internal infra and I think we can create a
>>>>> working solution shortly.
>>>>>
>>>>> On Thu, Feb 13, 2025 at 12:53 PM Santosh Pingale <
>>>>> santosh.ping...@adyen.com> wrote:
>>>>>
>>>>>> > Maybe we should do it at runtime: if Spark retries a shuffle stage
>>>>>> but the data becomes different (e.g. use checksum to check it), then 
>>>>>> Spark
>>>>>> should retry all the partitions of this stage.
>>>>>>
>>>>>> Having bitten hard recently by this behavior in spark and after
>>>>>> having gone down the rabbit hole to investigate, we should definitely try
>>>>>> to refine and implement this approach. The non-deterministicness is
>>>>>> dependent on several factors, including datatypes of the columns or use 
>>>>>> of
>>>>>> repartition; it is not easy for the users the reason about this behavior.
>>>>>>
>>>>>> PS: The JIRA that I found and helped me confirm my understanding of
>>>>>> the problem, plus educate internal users and platform team:
>>>>>> https://issues.apache.org/jira/browse/SPARK-38388. Checksum approach
>>>>>> was brought up in that JIRA too and I feel that is the balanced way to 
>>>>>> look
>>>>>> at this problem.
>>>>>>
>>>>>> Kind regards
>>>>>> Santosh
>>>>>>
>>>>>> On Thu, Feb 13, 2025, 12:41 AM Asif Shahid <asif.sha...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> Following up on the issue with some information:
>>>>>>> 1) have opened a PR related to fix the isInDeterminate method in
>>>>>>> Stage , RDD etc.
>>>>>>> 2) After tweaking the source code , have been able to reproduce the
>>>>>>> data loss issue reliably, but cannot productize the test, as it 
>>>>>>> requires a
>>>>>>> lot of inline interception and coaxing to induce test failure in a 
>>>>>>> single
>>>>>>> VM unit test. will post a diff of the source code patch and bug test.
>>>>>>> 3) Have identified a race condition , in the current
>>>>>>> DagSchdeduler code base:
>>>>>>>     In the current DagScheduler code, function
>>>>>>> *handleTaskCompletion, * when a task fails, while resubmitting the
>>>>>>> failure as ResubmitFailedStage, prior to submitting, it checks if the
>>>>>>> shuffle is inDeterminate.   If it is , then stage is submitted ONLY IF 
>>>>>>> the
>>>>>>> ResultStage has ZERO partitions processed. Which is right,  But it is 
>>>>>>> not a
>>>>>>> atomic operation till the point , where missingPartitions from stage is
>>>>>>> invoked.  As a result it is possible that one of the task from the  
>>>>>>> current
>>>>>>> attempt, marks one of the partition as successful, and while the new
>>>>>>> attempt is made, instead of getting missing partitions as  total
>>>>>>> partitions,  it fetches only some of them.
>>>>>>> Will be modifying the PR to fix the race.
>>>>>>> Regards
>>>>>>> Asif
>>>>>>>
>>>>>>> On Sun, Jan 26, 2025 at 11:19 PM Asif Shahid <asif.sha...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Shouldn't it be possible to determine with static data , if output
>>>>>>>> will be deterministic ?. Expressions already have deterministic flag. 
>>>>>>>> So
>>>>>>>> when an attribute is created from alias, it will be possible to know if
>>>>>>>> attribute is pointing to an inDeterminate component.
>>>>>>>>
>>>>>>>> On Sun, Jan 26, 2025, 11:09 PM Wenchen Fan <cloud0...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> It looks like a hard problem to statically analyze the query plan
>>>>>>>>> and decide whether a Spark stage is deterministic or not. When I added
>>>>>>>>> RDD DeterministicLevel, I thought it was not a hard problem for the 
>>>>>>>>> callers
>>>>>>>>> to specify it, but seems I was wrong.
>>>>>>>>>
>>>>>>>>> Maybe we should do it at runtime: if Spark retries a shuffle stage
>>>>>>>>> but the data becomes different (e.g. use checksum to check it), then 
>>>>>>>>> Spark
>>>>>>>>> should retry all the partitions of this stage. I'll look into this 
>>>>>>>>> repro
>>>>>>>>> after I'm back from the national holiday.
>>>>>>>>>
>>>>>>>>> On Mon, Jan 27, 2025 at 2:16 PM Asif Shahid <asif.sha...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I am using below test.  as a unit test though , it will pass, as
>>>>>>>>>> to simulate executor lost in a single vm is difficult, but there is
>>>>>>>>>> definitely a bug.
>>>>>>>>>> Using debugger if you check, the ShuffleStage.isDeterminate is
>>>>>>>>>> turning out to be true, though it clearly should not be.
>>>>>>>>>> As result if you look at DagScheduler, TaskSchedulerImpl and
>>>>>>>>>> TaskSet code, it will not retry the ShuffleStage fully, instead 
>>>>>>>>>> would only
>>>>>>>>>> retry missing task.
>>>>>>>>>> Which means on retry if the join used a random value which puts
>>>>>>>>>> it in already completed shuffle task, the results will be missed.
>>>>>>>>>>
>>>>>>>>>>> package org.apache.spark.sql.vectorized
>>>>>>>>>>>
>>>>>>>>>>> import org.apache.spark.rdd.ZippedPartitionsRDD2
>>>>>>>>>>> import org.apache.spark.sql.{DataFrame, Encoders, QueryTest}
>>>>>>>>>>> import org.apache.spark.sql.catalyst.expressions.Literal
>>>>>>>>>>> import org.apache.spark.sql.execution.datasources.FileScanRDD
>>>>>>>>>>> import org.apache.spark.sql.functions._
>>>>>>>>>>> import org.apache.spark.sql.internal.SQLConf
>>>>>>>>>>> import org.apache.spark.sql.test.SharedSparkSession
>>>>>>>>>>> import org.apache.spark.sql.types.LongType
>>>>>>>>>>>
>>>>>>>>>>> class BugTest extends QueryTest with SharedSparkSession {
>>>>>>>>>>>   import testImplicits._
>>>>>>>>>>>  /* test("no retries") {
>>>>>>>>>>>     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
>>>>>>>>>>>       val baseDf = spark.createDataset(
>>>>>>>>>>>         Seq((1L, "a"), (2L, "b"), (3L, "c"), (null, "w"), (null, 
>>>>>>>>>>> "x"), (null, "y"), (null, "z")))(
>>>>>>>>>>>         Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft")
>>>>>>>>>>>
>>>>>>>>>>>       val leftOuter = baseDf.select(
>>>>>>>>>>>         $"strleft", when(isnull($"pkLeft"), 
>>>>>>>>>>> monotonically_increasing_id() + Literal(100)).
>>>>>>>>>>>           otherwise($"pkLeft").as("pkLeft"))
>>>>>>>>>>>       leftOuter.show(10000)
>>>>>>>>>>>
>>>>>>>>>>>       val innerRight = spark.createDataset(
>>>>>>>>>>>         Seq((1L, "11"), (2L, "22"), (3L, "33")))(
>>>>>>>>>>>         Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")
>>>>>>>>>>>
>>>>>>>>>>>       val innerjoin = leftOuter.join(innerRight, $"pkLeft" === 
>>>>>>>>>>> $"pkRight", "inner")
>>>>>>>>>>>
>>>>>>>>>>>       innerjoin.show(1000)
>>>>>>>>>>>
>>>>>>>>>>>       val outerjoin = leftOuter.join(innerRight, $"pkLeft" === 
>>>>>>>>>>> $"pkRight", "left_outer")
>>>>>>>>>>>
>>>>>>>>>>>       outerjoin.show(1000)
>>>>>>>>>>>     }
>>>>>>>>>>>   } */
>>>>>>>>>>>
>>>>>>>>>>>   test("with retries") {
>>>>>>>>>>>     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
>>>>>>>>>>>       withTable("outer", "inner") {
>>>>>>>>>>>         createBaseTables()
>>>>>>>>>>>         val outerjoin: DataFrame = getOuterJoinDF
>>>>>>>>>>>
>>>>>>>>>>>         println("Initial data")
>>>>>>>>>>>         outerjoin.show(1000)
>>>>>>>>>>>         val correctRows = outerjoin.collect()
>>>>>>>>>>>         for( i <- 0 until 100) {
>>>>>>>>>>>          FileScanRDD.throwException = false
>>>>>>>>>>>           ZippedPartitionsRDD2.throwException = true
>>>>>>>>>>>           val rowsAfterRetry = getOuterJoinDF.collect()
>>>>>>>>>>>           import scala.jdk.CollectionConverters._
>>>>>>>>>>>           val temp = 
>>>>>>>>>>> spark.createDataFrame(rowsAfterRetry.toSeq.asJava, outerjoin.schema)
>>>>>>>>>>>           println("after retry data")
>>>>>>>>>>>           temp.show(1000)
>>>>>>>>>>>           assert(correctRows.length == rowsAfterRetry.length)
>>>>>>>>>>>           val retriedResults = rowsAfterRetry.toBuffer
>>>>>>>>>>>           correctRows.foreach(r => {
>>>>>>>>>>>             val index = retriedResults.indexWhere(x =>
>>>>>>>>>>>               r.getString(0) == x.getString(0) &&
>>>>>>>>>>>                 (r.getLong(1) == x.getLong(1) || (x.isNullAt(2) && 
>>>>>>>>>>> r.isNullAt(2) &&
>>>>>>>>>>>                   x.isNullAt(3) && r.isNullAt(3))) &&
>>>>>>>>>>>                 ((r.isNullAt(2) && x.isNullAt(2)) || r.getLong(2) 
>>>>>>>>>>> == x.getLong(2)) &&
>>>>>>>>>>>                 ((r.isNullAt(3) && x.isNullAt(3)) || r.getString(3) 
>>>>>>>>>>> == x.getString(3)))
>>>>>>>>>>>             assert(index >= 0)
>>>>>>>>>>>             retriedResults.remove(index)
>>>>>>>>>>>           }
>>>>>>>>>>>           )
>>>>>>>>>>>           assert(retriedResults.isEmpty)
>>>>>>>>>>>         }
>>>>>>>>>>>
>>>>>>>>>>>      //   Thread.sleep(10000000)
>>>>>>>>>>>       }
>>>>>>>>>>>     }
>>>>>>>>>>>   }
>>>>>>>>>>>
>>>>>>>>>>>   private def createBaseTables(): Unit = {
>>>>>>>>>>>     /*val outerDf = spark.createDataset(
>>>>>>>>>>>       Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, 
>>>>>>>>>>> "cc"), (null, "cc")))(
>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>> Encoders.STRING)).toDF("pkLeft", "strleft")
>>>>>>>>>>>     outerDf.write.format("parquet").saveAsTable("outer")*/
>>>>>>>>>>>
>>>>>>>>>>>     val outerDf = spark.createDataset(
>>>>>>>>>>>       Seq((1L, "aa"), (null, "aa"), (2L, "aa"), (null, "bb"), (3L, 
>>>>>>>>>>> "bb"), (null, "bb")))(
>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>> Encoders.STRING)).toDF("pkLeftt", "strleft")
>>>>>>>>>>>     
>>>>>>>>>>> outerDf.write.format("parquet").partitionBy("strleft").saveAsTable("outer")
>>>>>>>>>>>
>>>>>>>>>>>     /*val innerDf = spark.createDataset(
>>>>>>>>>>>       Seq((1L, "11"), (2L, "22"), (3L, "33")))(
>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")*/
>>>>>>>>>>>
>>>>>>>>>>>     val innerDf = spark.createDataset(
>>>>>>>>>>>       Seq((1L, "11"), (2L, "11"), (3L, "33")))(
>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>> Encoders.STRING)).toDF("pkRight", "strright")
>>>>>>>>>>>
>>>>>>>>>>>     
>>>>>>>>>>> innerDf.write.format("parquet").partitionBy("strright").saveAsTable("inner")
>>>>>>>>>>>
>>>>>>>>>>>     val innerInnerDf = spark.createDataset(
>>>>>>>>>>>       Seq((1L, "111"), (2L, "222"), (3L, "333")))(
>>>>>>>>>>>       Encoders.tupleEncoder(Encoders.LONG, 
>>>>>>>>>>> Encoders.STRING)).toDF("pkpkRight", "strstrright")
>>>>>>>>>>>
>>>>>>>>>>>   //  innerInnerDf.write.format("parquet").saveAsTable("innerinner")
>>>>>>>>>>>   }
>>>>>>>>>>>
>>>>>>>>>>>   private def getOuterJoinDF = {
>>>>>>>>>>>     val leftOuter = spark.table("outer").select(
>>>>>>>>>>>       $"strleft", when(isnull($"pkLeftt"), floor(rand() * 
>>>>>>>>>>> Literal(10000000L)).
>>>>>>>>>>>         cast(LongType)).
>>>>>>>>>>>         otherwise($"pkLeftt").as("pkLeft"))
>>>>>>>>>>>
>>>>>>>>>>>    val innerRight = spark.table("inner")
>>>>>>>>>>>  //   val innerinnerRight = spark.table("innerinner")
>>>>>>>>>>>
>>>>>>>>>>>     val outerjoin = leftOuter.hint("shuffle_hash").
>>>>>>>>>>>       join(innerRight, $"pkLeft" === $"pkRight", "left_outer")
>>>>>>>>>>>     outerjoin
>>>>>>>>>>>     /*
>>>>>>>>>>>     val outerOuterJoin = outerjoin.hint("shuffle_hash").
>>>>>>>>>>>       join(innerinnerRight, $"pkLeft" === $"pkpkRight", 
>>>>>>>>>>> "left_outer")
>>>>>>>>>>>     outerOuterJoin
>>>>>>>>>>>
>>>>>>>>>>>      */
>>>>>>>>>>>   }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> On Sun, Jan 26, 2025 at 10:05 PM Asif Shahid <
>>>>>>>>>> asif.sha...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Sure. I will send prototypical query tomorrow. Though its
>>>>>>>>>>> difficult to simulate issue using unit test , but I think the issue 
>>>>>>>>>>> is
>>>>>>>>>>> Rdd.isIndeterminate is not returning true for the query. As a
>>>>>>>>>>> result, on retry, the shuffle stage is not reattempted fully.
>>>>>>>>>>> And rdd is not returning inDeterminate as true , is due to
>>>>>>>>>>> ShuffleRdd not taking into account , as the ShuffleDependency is 
>>>>>>>>>>> not taking
>>>>>>>>>>> into account inDeterminate nature of HashPartitioner.
>>>>>>>>>>> Moreover the attribute ref provided to HashPartitioner though is
>>>>>>>>>>> pointing to an inDeterminate alias, is having deterministic flag as 
>>>>>>>>>>> true (
>>>>>>>>>>> which in itself is logically correct).
>>>>>>>>>>> So I feel apart from ShuffleDependency code change, all
>>>>>>>>>>> expressions should have a lazy Boolean say containsIndeterministic
>>>>>>>>>>> component. Which will be true if the expression is indeterministic 
>>>>>>>>>>> or
>>>>>>>>>>> contains any attribute ref which has is indeterministicComponent 
>>>>>>>>>>> true.
>>>>>>>>>>>
>>>>>>>>>>> And on personal note.. thanks for your interest..  this is very
>>>>>>>>>>> rare attitude.
>>>>>>>>>>> Regards
>>>>>>>>>>> Asif
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Jan 26, 2025, 9:45 PM Ángel <
>>>>>>>>>>> angel.alvarez.pas...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Asif,
>>>>>>>>>>>>
>>>>>>>>>>>> Could you provide an example (code+dataset) to analize this?
>>>>>>>>>>>> Looks interesting ...
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Ángel
>>>>>>>>>>>>
>>>>>>>>>>>> El dom, 26 ene 2025 a las 20:58, Asif Shahid (<
>>>>>>>>>>>> asif.sha...@gmail.com>) escribió:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> On further thoughts, I concur that leaf expressions like
>>>>>>>>>>>>> AttributeRefs can always be considered to be  deterministic, as , 
>>>>>>>>>>>>> as a java
>>>>>>>>>>>>> variable the value contained in it per iteration is invariant ( 
>>>>>>>>>>>>> except when
>>>>>>>>>>>>> changed by some deterministic logic). So in that sense what I 
>>>>>>>>>>>>> said in the
>>>>>>>>>>>>> above mail as that an issue is incorrect.
>>>>>>>>>>>>> But I think that AttributeRef should have a boolean method
>>>>>>>>>>>>> which tells, whether the value it represents is from an 
>>>>>>>>>>>>> indeterminate
>>>>>>>>>>>>> source or not.
>>>>>>>>>>>>> Regards
>>>>>>>>>>>>> Asif
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jan 24, 2025 at 5:18 PM Asif Shahid <
>>>>>>>>>>>>> asif.sha...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> While testing a use case where the query had an outer join
>>>>>>>>>>>>>> such that joining key of left outer table either had a valid 
>>>>>>>>>>>>>> value or a
>>>>>>>>>>>>>> random value( salting to avoid skew).
>>>>>>>>>>>>>> The case was reported to have incorrect results in case of
>>>>>>>>>>>>>> node failure, with retry.
>>>>>>>>>>>>>> On debugging the code, have found following, which has left
>>>>>>>>>>>>>> me confused as to what is spark's strategy for indeterministic 
>>>>>>>>>>>>>> fields.
>>>>>>>>>>>>>> Some serious issues are :
>>>>>>>>>>>>>> 1) All the leaf expressions, like AttributeReference  are
>>>>>>>>>>>>>> always considered deterministic. Which means if an attribute is 
>>>>>>>>>>>>>> pointing to
>>>>>>>>>>>>>> an Alias which itself is indeterministic,  the attribute will 
>>>>>>>>>>>>>> still be
>>>>>>>>>>>>>> considered deterministic
>>>>>>>>>>>>>> 2) In CheckAnalysis there is code which checks whether each
>>>>>>>>>>>>>> Operator either supports indeterministic value or not . Join is 
>>>>>>>>>>>>>> not
>>>>>>>>>>>>>> included in the list of supported, but it passes even if the 
>>>>>>>>>>>>>> joining key is
>>>>>>>>>>>>>> pointing to an indeterministic alias. ( When I tried fixing it, 
>>>>>>>>>>>>>> found a
>>>>>>>>>>>>>> plethora of operators failing Like DeserializedObject, 
>>>>>>>>>>>>>> LocalRelation etc
>>>>>>>>>>>>>> which are not supposed to contain indeterministic attributes ( 
>>>>>>>>>>>>>> because they
>>>>>>>>>>>>>> are not in the list of supporting operators).
>>>>>>>>>>>>>> 3) The ShuffleDependency does not check for indeterministic
>>>>>>>>>>>>>> nature of partitioner ( fixed it locally and then realized that 
>>>>>>>>>>>>>> there is
>>>>>>>>>>>>>> the bug #1 which needs to be fixed too).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The code in DagScheduler / TaskSet, TaskScheduler etc, seems
>>>>>>>>>>>>>> to have been written , keeping in mind the indeterministic 
>>>>>>>>>>>>>> nature of the
>>>>>>>>>>>>>> previous and current stages , so as to rexecute previous stages 
>>>>>>>>>>>>>> as a whole,
>>>>>>>>>>>>>> instead of just missing tasks, but the above  3 points, do not 
>>>>>>>>>>>>>> seem to
>>>>>>>>>>>>>> support the code of DagScheduler / TaskScheduler.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>> Asif
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Reply via email to