Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22112#discussion_r210964794
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
    @@ -1864,6 +1877,22 @@ abstract class RDD[T: ClassTag](
       // From performance concern, cache the value to avoid repeatedly compute 
`isBarrier()` on a long
       // RDD chain.
       @transient protected lazy val isBarrier_ : Boolean = 
dependencies.exists(_.rdd.isBarrier())
    +
    +  /**
    +   * Whether the RDD's computing function is idempotent. Idempotent means 
the computing function
    +   * not only satisfies the requirement, but also produce the same output 
sequence(the output order
    +   * can't vary) given the same input sequence. Spark assumes all the RDDs 
are idempotent, except
    +   * for the shuffle RDD and RDDs derived from non-idempotent RDD.
    +   */
    +  // TODO: Add public APIs to allow users to mark their RDD as 
non-idempotent.
    +  // TODO: this can be per-partition. e.g. UnionRDD can have part of its 
partitions idempotent.
    +  private[spark] def isIdempotent: Boolean = {
    +    dependencies.forall { dep =>
    +      // Shuffle RDD is always considered as non-idempotent, because its 
computing function needs
    +      // to fetch remote shuffle blocks, and these fetched blocks may 
arrive in a random order.
    +      !dep.isInstanceOf[ShuffleDependency[_, _, _]] && dep.rdd.isIdempotent
    --- End diff --
    
    This is too strict.
    As I discussed with @jiangxb1987 , something like this would be better:
    ```
      dep =>
        dep match {
          case shuffleDep: ShuffleDependency[_, _, _] => 
shuffleDep.keyOrdering.isDefined
          // IIRC this is not comprehensive if checkpoint is happening as part 
of this job.
          case checkpointedDep: Dependency[_] if 
checkpointedDep.rdd.isCheckpointed => true
          case _ => dep.rdd.isIdempotent
        }
    ```
    
    Note that this method can end up with stack overflow error's - please refer 
to `DAGScheduler.stageDependsOn` which does a similar dependency traveral (but 
for different purpose).



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to