[
https://issues.apache.org/jira/browse/SPARK-17006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15416453#comment-15416453
]
Dongjoon Hyun edited comment on SPARK-17006 at 8/11/16 3:15 AM:
----------------------------------------------------------------
Hi, [~hkothari].
It's interesting experiment. By the way, the code is using `foldLeft` now.
What if the unrolled code? I mean something like the following.
{code}
df.withColumn(...).withColumn(...).withColumn(...).
{code}
Also, the root cause might be too many column-name resolutions. You can see the
following PR which improved 6x times.
https://github.com/apache/spark/pull/14083
was (Author: dongjoon):
Hi, [~hkothari].
It's interesting experiment. By the way, the code is using `foldLeft` now.
What if the unrolled code? I mean something like the following.
{code}
df.withColumn(...).withColumn(...).withColumn(...).
{code}
Also, the root cause might be too many column-name resolution. You can see the
following PR which improved 6x times.
https://github.com/apache/spark/pull/14083
> WithColumn Performance Degrades with Number of Invocations
> ----------------------------------------------------------
>
> Key: SPARK-17006
> URL: https://issues.apache.org/jira/browse/SPARK-17006
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.0
> Reporter: Hamel Ajay Kothari
>
> Consider the following test case. We create a dataframe with 100 withColumn
> statements, then 100 more, then 100 more, then 100 more. Each time we do this
> it gets slower pretty drastically. If we sub in the optimized plan, we end up
> with drastically better performance.
> Consider the following code:
> {code}
> val raw = sc.parallelize(Range(1, 100)).toDF
> val s1 = System.nanoTime()
> var mapped = Range(1, 100).foldLeft(raw) { (df, i) =>
> df.withColumn(s"value${i}", df("value") + i)
> }
> val s2 = System.nanoTime()
> val mapped2 = Range(1, 100).foldLeft(mapped) { (df, i) =>
> df.withColumn(s"value${i}_2", df("value") + i)
> }
> val s3 = System.nanoTime()
> val mapped3 = Range(1, 100).foldLeft(mapped2) { (df, i) =>
> df.withColumn(s"value${i}_3", df("value") + i)
> }
> val s4 = System.nanoTime()
> val mapped4 = Range(1, 100).foldLeft(mapped3) { (df, i) =>
> df.withColumn(s"value${i}_4", df("value") + i)
> }
> val s5 = System.nanoTime()
> val plan = mapped3.queryExecution.optimizedPlan
> val optimizedMapped3 = new org.apache.spark.sql.DataFrame(spark, plan,
> org.apache.spark.sql.catalyst.encoders.RowEncoder(mapped3.schema))
> val s6 = System.nanoTime()
> val mapped5 = Range(1, 100).foldLeft(optimizedMapped3) { (df, i) =>
> df.withColumn(s"value${i}_4", df("value") + i)
> }
> val s7 = System.nanoTime()
> val mapped6 = Range(1, 100).foldLeft(mapped3) { (df, i) =>
> df.withColumn(s"value${i}_4", df("value") + i)
> }
> val s8 = System.nanoTime()
> val plan = mapped3.queryExecution.analyzed
> val analyzedMapped4 = new org.apache.spark.sql.DataFrame(spark, plan,
> org.apache.spark.sql.catalyst.encoders.RowEncoder(mapped3.schema))
> val mapped7 = Range(1, 100).foldLeft(analyzedMapped4) { (df, i) =>
> df.withColumn(s"value${i}_4", df("value") + i)
> }
> val s9 = System.nanoTime()
> val secondsToNanos = 1000*1000*1000.0
> val stage1 = (s2-s1)/secondsToNanos
> val stage2 = (s3-s2)/secondsToNanos
> val stage3 = (s4-s3)/secondsToNanos
> val stage4 = (s5-s4)/secondsToNanos
> val stage5 = (s6-s5)/secondsToNanos
> val stage6 = (s7-s6)/secondsToNanos
> val stage7 = (s8-s7)/secondsToNanos
> val stage8 = (s9-s8)/secondsToNanos
> println(s"First 100: ${stage1}")
> println(s"Second 100: ${stage2}")
> println(s"Third 100: ${stage3}")
> println(s"Fourth 100: ${stage4}")
> println(s"Fourth 100 Optimization time: ${stage5}")
> println(s"Fourth 100 Optimized ${stage6}")
> println(s"Fourth Unoptimized (to make sure no caching/etc takes place,
> reusing analyzed etc: ${stage7}")
> println(s"Fourth selects: ${stage8}")
> {code}
> This results in the following performance:
> {code}
> First 100: 4.873489454
> Second 100: 14.982028303 seconds
> Third 100: 38.775467952 seconds
> Fourth 100: 73.429119675 seconds
> Fourth 100 Optimization time: 1.777374175 seconds
> Fourth 100 Optimized 22.514489934 seconds
> Fourth Unoptimized (to make sure no caching/etc takes place, reusing analyzed
> etc: 69.616112734 seconds
> Fourth 100 using analyzed plan: 67.641982709 seconds
> {code}
> Now, I suspect that we can't just sub in the optimized plan for the logical
> plan because we lose a bunch of information which may be useful for
> optimization later. But, I suspect there's something we can do in the case of
> Projects at least that might be useful.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]