[
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17800682#comment-17800682
]
Wei Lu commented on SPARK-38388:
--------------------------------
We had the same problem(using Spark 3.2.1),is there any plan to fix the problem?
> Repartition + Stage retries could lead to incorrect data
> ---------------------------------------------------------
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.x
> Reporter: Jason Xu
> Priority: Major
> Labels: correctness, data-loss
>
> Spark repartition uses RoundRobinPartitioning, the generated results is
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a
> repartition is called on them, then followed by result stage (could be more
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might
> have finished, others would fail, shuffle files on that executor also get
> lost, some tasks from previous stage (upstream data generation, repartition)
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks
> is slightly different, repartition then generates inconsistent ordering, then
> tasks at result stage will be retried generating different data.
> This is similar but different to
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra
> local sort to make the row ordering deterministic, the sorting algorithm it
> uses simply compares row/record hash. But in this case, upstream data has
> some randomness, the sorting algorithm doesn't help keep the order, thus
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 1000000:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100,
> $"id").withColumn("val", rand()).repartition(100).map {
> row => if (TaskContext.get.stageAttemptNumber == 0 &&
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
> }
> TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command:
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's
> also enabled by default in your environment), this is to trigger shuffle
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user
> wouldn't expect to see incorrect result.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]