Hi Spark devs, I recently worked on a prototype to make it easier to identify the root cause of data skew in Spark. I wanted to see if the community was interested in it before working on contributing the changes (SPIP and PRs).
*Problem* When a query has data skew today, you see outlier tasks taking a long time but that's all. The natural next question is what's causing the skew, but it takes lots of manual debugging (e.g. modifying queries, materializing intermediate datasets, etc). *Solution* I created a new inline operator to count and surface the top K most common values for an expression using accumulators. This narrows down data skew by telling a user which subquery has skew and what value in the data is causing it. I also include an analyzer rule to enable inline key counting for all joins. The motivation here is for someone to be able to debug data skew for a production job without requiring code changes or deploying a test job. Here is an example of what it would look like in SHS for this example code. $ ./bin/spark-shell --conf spark.sql.debug.countJoinKeys=true val a = spark.range(10000) .withColumn("key1", concat(lit("key_"), $"id" % 10)) val b = spark.range(10000) .withColumn("key2", concat(lit("key_"), when($"id" > 10, > 10).otherwise($"id"))) val c = a.join(b, $"key1" === $"key2") c.count [image: image.png] Here is my prototype code <https://github.com/robreeves/spark/pull/1/files>. For counting, it uses the count-min sketch algorithm so it can be done in constant memory. I reused an existing count-min sketch implementation that already exists in Spark. Note, I know there are other performance optimizations that still need to be made before contributing it. This feature would be turned off by default since it will have a performance impact and to prevent personal identifiable information from being leaked to Spark History Server without the job owner's knowledge. *Extensions* Being able to identify the hot keys inline could be valuable for future optimizations, such as implementing Tree-Join (paper <https://arxiv.org/pdf/2209.08475>). Any extension beyond debugging has a prerequisite that inline counting performance overhead is not significant. Thanks for taking a look! - Rob