AQE has been turned off deliberately so that the `outputPartitioning` of the cached relation won't be changed by AQE partition coalescing or skew join optimization and the outputPartitioning can potentially be used by relations built on top of the cache.
On a second thought, we should probably add a config there and enable AQE by default. Thanks, Maryann On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers <ko...@tresata.com> wrote: > we tend to have spark.sql.shuffle.partitions set very high by default > simply because some jobs need it to be high and it's easier to then just > set the default high instead of having people tune it manually per job. the > main downside is lots of part files which leads to pressure on the driver, > and dynamic allocation becomes troublesome if every aggregation requires > thousands of tasks... even the simplest aggregation on tiny small data will > demand all resources on the cluster. > > because of these issues AQE appeals a lot to me: by automatically scaling > the reducer partitions we avoid these issues. so we have AQE turned on by > default. every once in a while i scan through our spark AMs and logs to see > how it's doing. i mostly look for stages that have a number of tasks equal > to spark.sql.shuffle.partitions, a sign to me that AQE isn't being > effective. unfortunately this seems to be the majority. i suspect it has to > do with caching/persisting which we use frequently. a simple reproduction > is below. > > any idea why caching/persisting would interfere with AQE? > > best, koert > > $ hadoop fs -text fruits.csv > fruit,color,quantity > apple,red,5 > grape,blue,50 > pear,green,3 > > # works well using AQE, uses 1 to 3 tasks per job > $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf > spark.sql.adaptive.enabled=true > scala> val data = spark.read.format("csv").option("header", > true).load("fruits.csv").persist() > scala> data.groupBy("fruit").count().write.format("csv").save("out) > > # does not work well using AQR, uses 200 tasks (e.g. > spark.sql.shuffle.partitions) for certain jobs. the only difference is when > persist is called. > $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf > spark.sql.adaptive.enabled=true > scala> val data = spark.read.format("csv").option("header", > true).load("fruits.csv").groupBy("fruit").count().persist() > scala> data.write.format("csv").save("out) > >