i see. it makes sense to maximize re-use of cached data. i didn't realize we have two potentially conflicting goals here.
On Thu, Aug 20, 2020 at 12:41 PM Maryann Xue <maryann....@databricks.com> wrote: > AQE has been turned off deliberately so that the `outputPartitioning` of > the cached relation won't be changed by AQE partition coalescing or skew > join optimization and the outputPartitioning can potentially be used by > relations built on top of the cache. > > On a second thought, we should probably add a config there and enable AQE > by default. > > > Thanks, > Maryann > > On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers <ko...@tresata.com> wrote: > >> we tend to have spark.sql.shuffle.partitions set very high by default >> simply because some jobs need it to be high and it's easier to then just >> set the default high instead of having people tune it manually per job. the >> main downside is lots of part files which leads to pressure on the driver, >> and dynamic allocation becomes troublesome if every aggregation requires >> thousands of tasks... even the simplest aggregation on tiny small data will >> demand all resources on the cluster. >> >> because of these issues AQE appeals a lot to me: by automatically scaling >> the reducer partitions we avoid these issues. so we have AQE turned on by >> default. every once in a while i scan through our spark AMs and logs to see >> how it's doing. i mostly look for stages that have a number of tasks equal >> to spark.sql.shuffle.partitions, a sign to me that AQE isn't being >> effective. unfortunately this seems to be the majority. i suspect it has to >> do with caching/persisting which we use frequently. a simple reproduction >> is below. >> >> any idea why caching/persisting would interfere with AQE? >> >> best, koert >> >> $ hadoop fs -text fruits.csv >> fruit,color,quantity >> apple,red,5 >> grape,blue,50 >> pear,green,3 >> >> # works well using AQE, uses 1 to 3 tasks per job >> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf >> spark.sql.adaptive.enabled=true >> scala> val data = spark.read.format("csv").option("header", >> true).load("fruits.csv").persist() >> scala> data.groupBy("fruit").count().write.format("csv").save("out) >> >> # does not work well using AQR, uses 200 tasks (e.g. >> spark.sql.shuffle.partitions) for certain jobs. the only difference is when >> persist is called. >> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf >> spark.sql.adaptive.enabled=true >> scala> val data = spark.read.format("csv").option("header", >> true).load("fruits.csv").groupBy("fruit").count().persist() >> scala> data.write.format("csv").save("out) >> >>