Hi,
We are able to reproduce this bug in Spark 2.4 using the following program:
import scala.sys.process._
import org.apache.spark.TaskContext
val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000,
x)}.repartition(20)
res.distinct.count
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).cache.repartition(239).map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1)
{
throw new Exception("pkill -f java".!!)
}
x
}
df.distinct.count()
The first df.distinct.count correctly produces 100000000
The second df.distinct.count incorrect produces 99999769
If the cache step is removed then the bug does not reproduce.
Best regards,
Tyson