[
https://issues.apache.org/jira/browse/SPARK-46198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vitaliy Savkin updated SPARK-46198:
-----------------------------------
Attachment: shuffle.png
> Unexpected Shuffle Read when using cached DataFrame
> ---------------------------------------------------
>
> Key: SPARK-46198
> URL: https://issues.apache.org/jira/browse/SPARK-46198
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.2.1
> Reporter: Vitaliy Savkin
> Priority: Major
> Attachments: shuffle.png
>
>
> When a computation is base on a cached data frames, I expect to see no
> Shuffle Reads, but it happens under certain circumstances.
> *Reproduction*
> {code:scala}
> val ctx: SQLContext = // init context
> val root = "s3a://af-data-eu-west-1-stg-parquet/vitalii-test-coalesce"
> def populateAndRead(tag: String): DataFrame = {
> val path = s"$root/numbers_$tag"
> // import ctx.implicits._
> // import org.apache.spark.sql.functions.lit
> // (0 to 10 * 1000 * 1000)
> // .toDF("id")
> // .withColumn(tag, lit(tag.toUpperCase))
> // .repartition(100)
> // .write
> // .option("header", "true")
> // .mode("ignore")
> // .csv(path)
> ctx.read.option("header", "true").csv(path).withColumnRenamed("id", tag
> + "_id")
> }
> val dfa = populateAndRead("a1")
> val dfb = populateAndRead("b1")
> val res =
> dfa.join(dfb, dfa("a1_id") === dfb("b1_id"))
> .unionByName(dfa.join(dfb, dfa("a1") === dfb("b1")))
> .cache()
> println(res.count())
> res.coalesce(1).write.mode("overwrite").csv(s"$root/numbers")
> {code}
> Relevant configs
> {code:scala}
> spark.executor.instances=10
> spark.executor.cores=7
> spark.executor.memory=40g
> spark.executor.memoryOverhead=5g
> spark.shuffle.service.enabled=true
> spark.sql.adaptive.enabled=false
> spark.sql.autoBroadcastJoinThreshold=-1
> {code}
> Spark Plan says that cache is used
> {code:scala}
> == Physical Plan ==
> Execute InsertIntoHadoopFsRelationCommand (27)
> +- Coalesce (26)
> +- InMemoryTableScan (1)
> +- InMemoryRelation (2)
> +- Union (25)
> :- * SortMergeJoin Inner (13)
> : :- * Sort (7)
> : : +- Exchange (6)
> : : +- * Project (5)
> : : +- * Filter (4)
> : : +- Scan csv (3)
> : +- * Sort (12)
> : +- Exchange (11)
> : +- * Project (10)
> : +- * Filter (9)
> : +- Scan csv (8)
> +- * SortMergeJoin Inner (24)
> :- * Sort (18)
> : +- Exchange (17)
> : +- * Project (16)
> : +- * Filter (15)
> : +- Scan csv (14)
> +- * Sort (23)
> +- Exchange (22)
> +- * Project (21)
> +- * Filter (20)
> +- Scan csv (19)
> {code}
> But when running on YARN, the csv job has shuffle reads.
> !image-2023-12-01-09-27-39-463.png!
> *Additional info*
> - I was unable to reproduce it with local Spark.
> - If {{.withColumnRenamed("id", tag + "_id")}} is dropped and the join
> conditions are changed to just {{{}"id"{}}}, the issue disappears!
> - This behaviour is stable - it's not a result of failed instances.
> *Production impact*
> Without cache saving data in production takes much longer (30 seconds vs 18
> seconds). To avoid shuffle reads, we had to add a {{repartition}} step before
> {{cache}} as a workaround, which reduced time from 18 seconds to 10.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]