Vitaliy Savkin created SPARK-46198:
--------------------------------------
Summary: Unexpected Shuffle Read when using cached DataFrame
Key: SPARK-46198
URL: https://issues.apache.org/jira/browse/SPARK-46198
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.2.1
Reporter: Vitaliy Savkin
Attachments: shuffle.png
When a computation is base on a cached data frames, I expect to see no Shuffle
Reads, but it happens under certain circumstances.
*Reproduction*
{code:scala}
val ctx: SQLContext = // init context
val root = "s3a://af-data-eu-west-1-stg-parquet/vitalii-test-coalesce"
def populateAndRead(tag: String): DataFrame = {
val path = s"$root/numbers_$tag"
// import ctx.implicits._
// import org.apache.spark.sql.functions.lit
// (0 to 10 * 1000 * 1000)
// .toDF("id")
// .withColumn(tag, lit(tag.toUpperCase))
// .repartition(100)
// .write
// .option("header", "true")
// .mode("ignore")
// .csv(path)
ctx.read.option("header", "true").csv(path).withColumnRenamed("id", tag +
"_id")
}
val dfa = populateAndRead("a1")
val dfb = populateAndRead("b1")
val res =
dfa.join(dfb, dfa("a1_id") === dfb("b1_id"))
.unionByName(dfa.join(dfb, dfa("a1") === dfb("b1")))
.cache()
println(res.count())
res.coalesce(1).write.mode("overwrite").csv(s"$root/numbers")
{code}
Relevant configs
{code:scala}
spark.executor.instances=10
spark.executor.cores=7
spark.executor.memory=40g
spark.executor.memoryOverhead=5g
spark.shuffle.service.enabled=true
spark.sql.adaptive.enabled=false
spark.sql.autoBroadcastJoinThreshold=-1
{code}
Spark Plan says that cache is used
{code:scala}
== Physical Plan ==
Execute InsertIntoHadoopFsRelationCommand (27)
+- Coalesce (26)
+- InMemoryTableScan (1)
+- InMemoryRelation (2)
+- Union (25)
:- * SortMergeJoin Inner (13)
: :- * Sort (7)
: : +- Exchange (6)
: : +- * Project (5)
: : +- * Filter (4)
: : +- Scan csv (3)
: +- * Sort (12)
: +- Exchange (11)
: +- * Project (10)
: +- * Filter (9)
: +- Scan csv (8)
+- * SortMergeJoin Inner (24)
:- * Sort (18)
: +- Exchange (17)
: +- * Project (16)
: +- * Filter (15)
: +- Scan csv (14)
+- * Sort (23)
+- Exchange (22)
+- * Project (21)
+- * Filter (20)
+- Scan csv (19)
{code}
But when running on YARN, the csv job has shuffle reads.
!image-2023-12-01-09-27-39-463.png!
*Additional info*
- I was unable to reproduce it with local Spark.
- If {{.withColumnRenamed("id", tag + "_id")}} is dropped and the join
conditions are changed to just {{{}"id"{}}}, the issue disappears!
- This behaviour is stable - it's not a result of failed instances.
*Production impact*
Without cache saving data in production takes much longer (30 seconds vs 18
seconds). To avoid shuffle reads, we had to add a {{repartition}} step before
{{cache}} as a workaround, which reduced time from 18 seconds to 10.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]