Ok so the catalyst optimizer will use this method of inline key counting to provide spark optimizer with prior notification, so it identifies the hot keys? What is this inline key counting based? Likely Count-Min Sketch algorithm!
HTH Mich Talebzadeh, Architect | Data Science | Financial Crime | Forensic Analysis | GDPR view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> On Fri, 24 Jan 2025 at 21:17, Rob Reeves <robert.p.ree...@gmail.com> wrote: > Hi Spark devs, > > I recently worked on a prototype to make it easier to identify the root > cause of data skew in Spark. I wanted to see if the community was > interested in it before working on contributing the changes (SPIP and PRs). > > *Problem* > When a query has data skew today, you see outlier tasks taking a long time > but that's all. The natural next question is what's causing the skew, but > it takes lots of manual debugging (e.g. modifying queries, materializing > intermediate datasets, etc). > > *Solution* > I created a new inline operator to count and surface the top K most common > values for an expression using accumulators. This narrows down data skew by > telling a user which subquery has skew and what value in the data is > causing it. I also include an analyzer rule to enable inline key counting > for all joins. The motivation here is for someone to be able to debug data > skew for a production job without requiring code changes or deploying a > test job. > > Here is an example of what it would look like in SHS for this example > code. > > $ ./bin/spark-shell --conf spark.sql.debug.countJoinKeys=true > > > > val a = spark.range(10000) > > .withColumn("key1", concat(lit("key_"), $"id" % 10)) > > val b = spark.range(10000) > > .withColumn("key2", concat(lit("key_"), when($"id" > 10, >> 10).otherwise($"id"))) > > val c = a.join(b, $"key1" === $"key2") > > c.count > > > [image: image.png] > > > Here is my prototype code > <https://github.com/robreeves/spark/pull/1/files>. For counting, it uses > the count-min sketch algorithm so it can be done in constant memory. I > reused an existing count-min sketch implementation that already exists in > Spark. Note, I know there are other performance optimizations that still > need to be made before contributing it. > > This feature would be turned off by default since it will have a > performance impact and to prevent personal identifiable information from > being leaked to Spark History Server without the job owner's knowledge. > > *Extensions* > Being able to identify the hot keys inline could be valuable for future > optimizations, such as implementing Tree-Join (paper > <https://arxiv.org/pdf/2209.08475>). Any extension beyond debugging has a > prerequisite that inline counting performance overhead is not significant. > > Thanks for taking a look! > > - Rob > >