Hamel Ajay Kothari created SPARK-17006:
------------------------------------------
Summary: WithColumn Performance Degrades with Number of Invocations
Key: SPARK-17006
URL: https://issues.apache.org/jira/browse/SPARK-17006
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 2.0.0
Reporter: Hamel Ajay Kothari
Consider the following test case. We create a dataframe with 100 withColumn
statements, then 100 more, then 100 more, then 100 more. Each time we do this
it gets slower pretty drastically. If we sub in the optimized plan, we end up
with drastically better performance.
Consider the following code:
{code}
var raw = sc.parallelize(Range(1, 100)).toDF
val s1 = System.nanoTime()
var mapped = Range(1, 100).foldLeft(raw) { (df, i) =>
df.withColumn(s"value${i}", df("value") + i)
}
val s2 = System.nanoTime()
val mapped2 = Range(1, 100).foldLeft(mapped) { (df, i) =>
df.withColumn(s"value${i}_2", df("value") + i)
}
val s3 = System.nanoTime()
val mapped3 = Range(1, 100).foldLeft(mapped2) { (df, i) =>
df.withColumn(s"value${i}_3", df("value") + i)
}
val s4 = System.nanoTime()
val mapped4 = Range(1, 100).foldLeft(mapped3) { (df, i) =>
df.withColumn(s"value${i}_4", df("value") + i)
}
val s5 = System.nanoTime()
val plan = mapped3.queryExecution.optimizedPlan
val optimizedMapped3 = new org.apache.spark.sql.DataFrame(spark, plan,
org.apache.spark.sql.catalyst.encoders.RowEncoder(mapped3.schema))
val s6 = System.nanoTime()
val mapped5 = Range(1, 100).foldLeft(optimizedMapped3) { (df, i) =>
df.withColumn(s"value${i}_4", df("value") + i)
}
val s7 = System.nanoTime()
val mapped6 = Range(1, 100).foldLeft(mapped3) { (df, i) =>
df.withColumn(s"value${i}_4", df("value") + i)
}
val s8 = System.nanoTime()
val plan = mapped3.queryExecution.analyzed
val analyzedMapped4 = new org.apache.spark.sql.DataFrame(spark, plan,
org.apache.spark.sql.catalyst.encoders.RowEncoder(mapped3.schema))
val mapped7 = Range(1, 100).foldLeft(analyzedMapped4) { (df, i) =>
df.withColumn(s"value${i}_4", df("value") + i)
}
val s9 = System.nanoTime()
val secondsToNanos = 1000*1000*1000.0
val stage1 = (s2-s1)/secondsToNanos
val stage2 = (s3-s2)/secondsToNanos
val stage3 = (s4-s3)/secondsToNanos
val stage4 = (s5-s4)/secondsToNanos
val stage5 = (s6-s5)/secondsToNanos
val stage6 = (s7-s6)/secondsToNanos
val stage7 = (s8-s7)/secondsToNanos
val stage8 = (s9-s8)/secondsToNanos
println(s"First 100: ${stage1}")
println(s"Second 100: ${stage2}")
println(s"Third 100: ${stage3}")
println(s"Fourth 100: ${stage4}")
println(s"Fourth 100 Optimization time: ${stage5}")
println(s"Fourth 100 Optimized ${stage6}")
println(s"Fourth Unoptimized (to make sure no caching/etc takes place, reusing
analyzed etc: ${stage7}")
println(s"Fourth selects: ${stage8}")
{code}
This results in the following performance:
{code}
First 100: 4.873489454
Second 100: 14.982028303 seconds
Third 100: 38.775467952 seconds
Fourth 100: 73.429119675 seconds
Fourth 100 Optimization time: 1.777374175 seconds
Fourth 100 Optimized 22.514489934 seconds
Fourth Unoptimized (to make sure no caching/etc takes place, reusing analyzed
etc: 69.616112734 seconds
Fourth 100 using analyzed plan: 67.641982709 seconds
{code}
Now, I suspect that we can't just sub in the optimized plan for the logical
plan because we lose a bunch of information which may be useful for
optimization later. But, I suspect there's something we can do in the case of
Projects at least that might be useful.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]