i see. it makes sense to maximize re-use of cached data. i didn't realize
we have two potentially conflicting goals here.


On Thu, Aug 20, 2020 at 12:41 PM Maryann Xue <maryann....@databricks.com>
wrote:

> AQE has been turned off deliberately so that the `outputPartitioning` of
> the cached relation won't be changed by AQE partition coalescing or skew
> join optimization and the outputPartitioning can potentially be used by
> relations built on top of the cache.
>
> On a second thought, we should probably add a config there and enable AQE
> by default.
>
>
> Thanks,
> Maryann
>
> On Thu, Aug 20, 2020 at 11:12 AM Koert Kuipers <ko...@tresata.com> wrote:
>
>> we tend to have spark.sql.shuffle.partitions set very high by default
>> simply because some jobs need it to be high and it's easier to then just
>> set the default high instead of having people tune it manually per job. the
>> main downside is lots of part files which leads to pressure on the driver,
>> and dynamic allocation becomes troublesome if every aggregation requires
>> thousands of tasks... even the simplest aggregation on tiny small data will
>> demand all resources on the cluster.
>>
>> because of these issues AQE appeals a lot to me: by automatically scaling
>> the reducer partitions we avoid these issues. so we have AQE turned on by
>> default. every once in a while i scan through our spark AMs and logs to see
>> how it's doing. i mostly look for stages that have a number of tasks equal
>> to spark.sql.shuffle.partitions, a sign to me that AQE isn't being
>> effective. unfortunately this seems to be the majority. i suspect it has to
>> do with caching/persisting which we use frequently. a simple reproduction
>> is below.
>>
>> any idea why caching/persisting would interfere with AQE?
>>
>> best, koert
>>
>> $ hadoop fs -text fruits.csv
>> fruit,color,quantity
>> apple,red,5
>> grape,blue,50
>> pear,green,3
>>
>> # works well using AQE, uses 1 to 3 tasks per job
>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>> spark.sql.adaptive.enabled=true
>> scala> val data = spark.read.format("csv").option("header",
>> true).load("fruits.csv").persist()
>> scala> data.groupBy("fruit").count().write.format("csv").save("out)
>>
>> # does not work well using AQR, uses 200 tasks (e.g.
>> spark.sql.shuffle.partitions) for certain jobs. the only difference is when
>> persist is called.
>> $ spark-3.1.0-SNAPSHOT/bin/spark-shell --conf
>> spark.sql.adaptive.enabled=true
>> scala> val data = spark.read.format("csv").option("header",
>> true).load("fruits.csv").groupBy("fruit").count().persist()
>> scala> data.write.format("csv").save("out)
>>
>>

Reply via email to