AQE has been turned off deliberately so that the `outputPartitioning` of
the cached relation won't be changed by AQE partition coalescing or skew
join optimization and the outputPartitioning can potentially be used by
relations built on top of the cache.

On a second thought, we should probably add a config there and enable AQE
by default.


Thanks,
Maryann

On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers <ko...@tresata.com> wrote:

> we tend to have spark.sql.shuffle.partitions set very high by default
> simply because some jobs need it to be high and it's easier to then just
> set the default high instead of having people tune it manually per job. the
> main downside is lots of part files which leads to pressure on the driver,
> and dynamic allocation becomes troublesome if every aggregation requires
> thousands of tasks... even the simplest aggregation on tiny small data will
> demand all resources on the cluster.
>
> because of these issues AQE appeals a lot to me: by automatically scaling
> the reducer partitions we avoid these issues. so we have AQE turned on by
> default. every once in a while i scan through our spark AMs and logs to see
> how it's doing. i mostly look for stages that have a number of tasks equal
> to spark.sql.shuffle.partitions, a sign to me that AQE isn't being
> effective. unfortunately this seems to be the majority. i suspect it has to
> do with caching/persisting which we use frequently. a simple reproduction
> is below.
>
> any idea why caching/persisting would interfere with AQE?
>
> best, koert
>
> $ hadoop fs -text fruits.csv
> fruit,color,quantity
> apple,red,5
> grape,blue,50
> pear,green,3
>
> # works well using AQE, uses 1 to 3 tasks per job
> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
> spark.sql.adaptive.enabled=true
> scala> val data = spark.read.format("csv").option("header",
> true).load("fruits.csv").persist()
> scala> data.groupBy("fruit").count().write.format("csv").save("out)
>
> # does not work well using AQR, uses 200 tasks (e.g.
> spark.sql.shuffle.partitions) for certain jobs. the only difference is when
> persist is called.
> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
> spark.sql.adaptive.enabled=true
> scala> val data = spark.read.format("csv").option("header",
> true).load("fruits.csv").groupBy("fruit").count().persist()
> scala> data.write.format("csv").save("out)
>
>

Reply via email to