we tend to have spark.sql.shuffle.partitions set very high by default
simply because some jobs need it to be high and it's easier to then just
set the default high instead of having people tune it manually per job. the
main downside is lots of part files which leads to pressure on the driver,
and dynamic allocation becomes troublesome if every aggregation requires
thousands of tasks... even the simplest aggregation on tiny small data will
demand all resources on the cluster.

because of these issues AQE appeals a lot to me: by automatically scaling
the reducer partitions we avoid these issues. so we have AQE turned on by
default. every once in a while i scan through our spark AMs and logs to see
how it's doing. i mostly look for stages that have a number of tasks equal
to spark.sql.shuffle.partitions, a sign to me that AQE isn't being
effective. unfortunately this seems to be the majority. i suspect it has to
do with caching/persisting which we use frequently. a simple reproduction
is below.

any idea why caching/persisting would interfere with AQE?

best, koert

$ hadoop fs -text fruits.csv
fruit,color,quantity
apple,red,5
grape,blue,50
pear,green,3

# works well using AQE, uses 1 to 3 tasks per job
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header",
true).load("fruits.csv").persist()
scala> data.groupBy("fruit").count().write.format("csv").save("out)

# does not work well using AQR, uses 200 tasks (e.g.
spark.sql.shuffle.partitions) for certain jobs. the only difference is when
persist is called.
$ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
spark.sql.adaptive.enabled=true
scala> val data = spark.read.format("csv").option("header",
true).load("fruits.csv").groupBy("fruit").count().persist()
scala> data.write.format("csv").save("out)

Reply via email to