Ondrej Kokes created SPARK-35256:
------------------------------------

             Summary: str_to_map + split performance regression
                 Key: SPARK-35256
                 URL: https://issues.apache.org/jira/browse/SPARK-35256
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.1.1
            Reporter: Ondrej Kokes


I'm seeing almost double the runtime between 3.0.1 and 3.1.1 in my pipeline 
that does mostly str_to_map, split and a few other operations - all 
projections, no joins or aggregations (it's here only to trigger the pipeline). 
I cut it down to the simplest reproducible example I could - anything I remove 
from this changes the runtime difference quite dramatically. (even moving those 
two expressions from f.when to standalone columns makes the difference 
disappear)
{code:java}
import time
import os

import pyspark  
from pyspark.sql import SparkSession

import pyspark.sql.functions as f

if __name__ == '__main__':
    print(pyspark.__version__)
    spark = SparkSession.builder.getOrCreate()

    filename = 'regression.csv'
    if not os.path.isfile(filename):
        with open(filename, 'wt') as fw:
            fw.write('foo\n')
            for _ in range(10_000_000):
                fw.write('foo=bar&baz=bak&bar=f,o,1:2:3\n')

    df = spark.read.option('header', True).csv(filename)
    t = time.time()
    dd = (df
            .withColumn('my_map', f.expr('str_to_map(foo, "&", "=")'))
            .withColumn('extracted',
                            # without this top level split (so just 
`f.split(f.col("my_map")["bar"], ",")[2]`), it's only 50% slower, with it it's 
100%
                            f.split(f.split(f.col("my_map")["bar"], ",")[2], 
":")[0]
                       )
            .select(
                f.when(
                    f.col("extracted").startswith("foo"), f.col("extracted")
                ).otherwise(
                    f.concat(f.lit("foo"), f.col("extracted"))
                ).alias("foo")
            )
        )
    # dd.explain(True)
    _ = dd.groupby("foo").count().count()
    print("elapsed", time.time() - t)
{code}
Running this in 3.0.1 and 3.1.1 respectively (both installed from PyPI, on my 
local macOS)
{code:java}
3.0.1
elapsed 21.262351036071777
3.1.1
elapsed 40.26582884788513
{code}
(Meaning the transformation took 21 seconds in 3.0.1 and 40 seconds in 3.1.1)

Feel free to make the CSV smaller to get a quicker feedback loop - it scales 
linearly (I developed this with 2M rows).

It might be related to my previous issue - SPARK-32989 - there are similar 
operations, nesting etc. (splitting on the original column, not on a map, makes 
the difference disappear)

I tried dissecting the queries in SparkUI and via explain, but both 3.0.1 and 
3.1.1 produced identical plans.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to