André F. created SPARK-48290:
--------------------------------
Summary: AQE not working when joining dataframes with more than
2000 partitions
Key: SPARK-48290
URL: https://issues.apache.org/jira/browse/SPARK-48290
Project: Spark
Issue Type: Question
Components: Optimizer, SQL
Affects Versions: 3.5.1, 3.3.2
Environment: spark-standalone
spark3.5.1
Reporter: André F.
We are joining 2 large dataframes with a considerable skew on the left side in
one specific key (>2000 skew ratio). Since we have
`{{{}spark.sql.adaptive.enabled{}}} ` we expect AQE to act during the join,
dealing with the skewed partition automatically.
During their join, we can see the following log indicating that the skew was
not detected since their statistics looks weirdly equal for min/median/max
sizes:
{code:java}
OptimizeSkewedJoin: number of skewed partitions: left 0, right 0
OptimizeSkewedJoin:
Optimizing skewed join.
Left side partitions size info:
median size: 780925482, max size: 780925482, min size: 780925482, avg size:
780925482
Right side partitions size info:
median size: 3325797, max size: 3325797, min size: 3325797, avg size: 3325797
{code}
Looking at this log line and the spark configuration possibilities, our two
main hypotheses to work around this behavior and correctly detect the skew were:
# Increasing the `minNumPartitionsToHighlyCompress` so that Spark doesn’t
convert the statistics into a `CompressedMapStatus` and therefore is able to
identify the skewed partition.
# Allowing spark to use a `HighlyCompressedMapStatus`, but change other
configurations such as `spark.shuffle.accurateBlockThreshold` and
`spark.shuffle.accurateBlockSkewedFactor` so that even then the size of the
skewed partitions/blocks is accurately registered and consequently used in the
optimization.
We tried different values for `spark.shuffle.accurateBlockThreshold` (even
absurd ones like 1MB) and nothing seem to work. The statistics indicates that
the min/median and max are the same somehow and thus, the skew is not detected.
However, when forcibly reducing `spark.sql.shuffle.partitions` to less than
2000 partitions, the statistics looked correct and the optimized skewed join
acts as it should:
{code:java}
OptimizeSkewedJoin: number of skewed partitions: left 1, right 0
OptimizeSkewedJoin: Left side partition 42 (263 GB) is skewed, split it into
337 parts.
OptimizeSkewedJoin:
Optimizing skewed join.
Left side partitions size info:
median size: 862803419, max size: 282616632301, min size: 842320875, avg size:
1019367139
Right side partitions size info:
median size: 4320067, max size: 4376957, min size: 4248989, avg size: 4319766
{code}
Should we assume that the statistics are becoming corrupted when Spark uses
`HighlyCompressedMapStatus`? Should we try another configuration property to
try to work around this problem? (Assuming that fine tuning all dataframes in
skewed joins in our ETL to have less than 2000 partitions is not an option)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]