[ https://issues.apache.org/jira/browse/SPARK-35256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17343929#comment-17343929 ]
Ondrej Kokes commented on SPARK-35256: -------------------------------------- I ran git bisect with the setup mentioned in this ticket and found this to be the offending commit [https://github.com/apache/spark/commit/6fa80ed1dd43c2ecd092c10933330b501641c51b] I attached some logs to this issue - first a log of the actual bisection and then a CSV with all the timings - that is, I would build Spark (Ubuntu 20.x, OpenJDK 8) and run the script and if it was below 50 seconds, it was good, above it meant bad. I first timed the script using 3.0.1 and 3.1.1 downloaded from PyPI to get reference numbers for the good/bad threshold. > str_to_map + split performance regression > ----------------------------------------- > > Key: SPARK-35256 > URL: https://issues.apache.org/jira/browse/SPARK-35256 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.1.1 > Reporter: Ondrej Kokes > Priority: Minor > Attachments: bisect_log.txt, bisect_timing.csv > > > I'm seeing almost double the runtime between 3.0.1 and 3.1.1 in my pipeline > that does mostly str_to_map, split and a few other operations - all > projections, no joins or aggregations (it's here only to trigger the > pipeline). I cut it down to the simplest reproducible example I could - > anything I remove from this changes the runtime difference quite > dramatically. (even moving those two expressions from f.when to standalone > columns makes the difference disappear) > {code:java} > import time > import os > import pyspark > from pyspark.sql import SparkSession > import pyspark.sql.functions as f > if __name__ == '__main__': > print(pyspark.__version__) > spark = SparkSession.builder.getOrCreate() > filename = 'regression.csv' > if not os.path.isfile(filename): > with open(filename, 'wt') as fw: > fw.write('foo\n') > for _ in range(10_000_000): > fw.write('foo=bar&baz=bak&bar=f,o,1:2:3\n') > df = spark.read.option('header', True).csv(filename) > t = time.time() > dd = (df > .withColumn('my_map', f.expr('str_to_map(foo, "&", "=")')) > .withColumn('extracted', > # without this top level split it is only 50% > slower, with it > # the runtime almost doubles > f.split(f.split(f.col("my_map")["bar"], ",")[2], > ":")[0] > ) > .select( > f.when( > f.col("extracted").startswith("foo"), f.col("extracted") > ).otherwise( > f.concat(f.lit("foo"), f.col("extracted")) > ).alias("foo") > ) > ) > # dd.explain(True) > _ = dd.groupby("foo").count().count() > print("elapsed", time.time() - t) > {code} > Running this in 3.0.1 and 3.1.1 respectively (both installed from PyPI, on my > local macOS) > {code:java} > 3.0.1 > elapsed 21.262351036071777 > 3.1.1 > elapsed 40.26582884788513 > {code} > (Meaning the transformation took 21 seconds in 3.0.1 and 40 seconds in 3.1.1) > Feel free to make the CSV smaller to get a quicker feedback loop - it scales > linearly (I developed this with 2M rows). > It might be related to my previous issue - SPARK-32989 - there are similar > operations, nesting etc. (splitting on the original column, not on a map, > makes the difference disappear) > I tried dissecting the queries in SparkUI and via explain, but both 3.0.1 and > 3.1.1 produced identical plans. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org