[ 
https://issues.apache.org/jira/browse/SPARK-23207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yucai updated SPARK-23207:
--------------------------
    Description: 
Currently shuffle repartition uses RoundRobinPartitioning, the generated result 
is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle 
(which would lead to non-deterministic row ordering), as the pattern shows 
below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition 
stage will be retried and generate inconsistent ordering, and some tasks of the 
result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:
{code:java}
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
{code}

  was:
Currently shuffle repartition uses RoundRobinPartitioning, the generated result 
is nondeterministic since the sequence of input rows are not determined.

The bug can be triggered when there is a repartition call following a shuffle 
(which would lead to non-deterministic row ordering), as the pattern shows 
below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition 
stage will be retried and generate inconsistent ordering, and some tasks of the 
result stage will be retried generating different data.

The following code returns 931532, instead of 1000000:
{code}
import scala.sys.process._

import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
  x
}.repartition(200).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
    throw new Exception("pkill -f java".!!)
  }
  x
}
res.distinct().count()
{code}


> Shuffle+Repartition on an DataFrame could lead to incorrect answers
> -------------------------------------------------------------------
>
>                 Key: SPARK-23207
>                 URL: https://issues.apache.org/jira/browse/SPARK-23207
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0, 2.0.0, 2.1.0, 2.2.0, 2.3.0
>            Reporter: Xingbo Jiang
>            Assignee: Xingbo Jiang
>            Priority: Blocker
>              Labels: correctness
>             Fix For: 2.1.4, 2.2.3, 2.3.0
>
>
> Currently shuffle repartition uses RoundRobinPartitioning, the generated 
> result is nondeterministic since the sequence of input rows are not 
> determined.
> The bug can be triggered when there is a repartition call following a shuffle 
> (which would lead to non-deterministic row ordering), as the pattern shows 
> below:
> upstream stage -> repartition stage -> result stage
> (-> indicate a shuffle)
> When one of the executors process goes down, some tasks on the repartition 
> stage will be retried and generate inconsistent ordering, and some tasks of 
> the result stage will be retried generating different data.
> The following code returns 931532, instead of 1000000:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
>   x
> }.repartition(200).map { x =>
>   if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 2) {
>     throw new Exception("pkill -f java".!!)
>   }
>   x
> }
> res.distinct().count()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to