The counting does use count-min sketch and publishes the top K keys above a skew threshold to an accumulator. The core implementation in my prototype is in InlineApproxCountExec <https://github.com/robreeves/spark/pull/1/files#diff-3f324e7c94939ee254c49b3456a1685129fe6b01e2243343b666b8da6cc23b94R319>. The logical operator I added is InlineApproxCount <https://github.com/robreeves/spark/pull/1/files#diff-ae6713092c998474a5a8d6450aae3f06ad6c405ad608419a1aaa86fca2739311R27>. This can be used explicitly or analyzer rules can be used to inject it so the skew identification can be done during debugging without code changes.
On Fri, Jan 24, 2025 at 3:05 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote: > Ok so the catalyst optimizer will use this method of inline key counting > to provide spark optimizer with prior notification, so it identifies the > hot keys? What is this inline key counting based? Likely Count-Min Sketch > algorithm! > > HTH > > Mich Talebzadeh, > Architect | Data Science | Financial Crime | Forensic Analysis | GDPR > > view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > > > On Fri, 24 Jan 2025 at 21:17, Rob Reeves <robert.p.ree...@gmail.com> > wrote: > >> Hi Spark devs, >> >> I recently worked on a prototype to make it easier to identify the root >> cause of data skew in Spark. I wanted to see if the community was >> interested in it before working on contributing the changes (SPIP and PRs). >> >> *Problem* >> When a query has data skew today, you see outlier tasks taking a long >> time but that's all. The natural next question is what's causing the skew, >> but it takes lots of manual debugging (e.g. modifying queries, >> materializing intermediate datasets, etc). >> >> *Solution* >> I created a new inline operator to count and surface the top K most >> common values for an expression using accumulators. This narrows down data >> skew by telling a user which subquery has skew and what value in the data >> is causing it. I also include an analyzer rule to enable inline key >> counting for all joins. The motivation here is for someone to be able to >> debug data skew for a production job without requiring code changes or >> deploying a test job. >> >> Here is an example of what it would look like in SHS for this example >> code. >> >> $ ./bin/spark-shell --conf spark.sql.debug.countJoinKeys=true >> >> >> >> val a = spark.range(10000) >> >> .withColumn("key1", concat(lit("key_"), $"id" % 10)) >> >> val b = spark.range(10000) >> >> .withColumn("key2", concat(lit("key_"), when($"id" > 10, >>> 10).otherwise($"id"))) >> >> val c = a.join(b, $"key1" === $"key2") >> >> c.count >> >> >> [image: image.png] >> >> >> Here is my prototype code >> <https://github.com/robreeves/spark/pull/1/files>. For counting, it uses >> the count-min sketch algorithm so it can be done in constant memory. I >> reused an existing count-min sketch implementation that already exists in >> Spark. Note, I know there are other performance optimizations that still >> need to be made before contributing it. >> >> This feature would be turned off by default since it will have a >> performance impact and to prevent personal identifiable information from >> being leaked to Spark History Server without the job owner's knowledge. >> >> *Extensions* >> Being able to identify the hot keys inline could be valuable for future >> optimizations, such as implementing Tree-Join (paper >> <https://arxiv.org/pdf/2209.08475>). Any extension beyond debugging has >> a prerequisite that inline counting performance overhead is not significant. >> >> Thanks for taking a look! >> >> - Rob >> >>