[
https://issues.apache.org/jira/browse/SPARK-17006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hamel Ajay Kothari updated SPARK-17006:
---------------------------------------
Description:
Consider the following test case. We create a dataframe with 100 withColumn
statements, then 100 more, then 100 more, then 100 more. Each time we do this
it gets slower pretty drastically. If we sub in the optimized plan, we end up
with drastically better performance.
Consider the following code:
{code}
val raw = sc.parallelize(Range(1, 100)).toDF
val s1 = System.nanoTime()
var mapped = Range(1, 100).foldLeft(raw) { (df, i) =>
df.withColumn(s"value${i}", df("value") + i)
}
val s2 = System.nanoTime()
val mapped2 = Range(1, 100).foldLeft(mapped) { (df, i) =>
df.withColumn(s"value${i}_2", df("value") + i)
}
val s3 = System.nanoTime()
val mapped3 = Range(1, 100).foldLeft(mapped2) { (df, i) =>
df.withColumn(s"value${i}_3", df("value") + i)
}
val s4 = System.nanoTime()
val mapped4 = Range(1, 100).foldLeft(mapped3) { (df, i) =>
df.withColumn(s"value${i}_4", df("value") + i)
}
val s5 = System.nanoTime()
val plan = mapped3.queryExecution.optimizedPlan
val optimizedMapped3 = new org.apache.spark.sql.DataFrame(spark, plan,
org.apache.spark.sql.catalyst.encoders.RowEncoder(mapped3.schema))
val s6 = System.nanoTime()
val mapped5 = Range(1, 100).foldLeft(optimizedMapped3) { (df, i) =>
df.withColumn(s"value${i}_4", df("value") + i)
}
val s7 = System.nanoTime()
val mapped6 = Range(1, 100).foldLeft(mapped3) { (df, i) =>
df.withColumn(s"value${i}_4", df("value") + i)
}
val s8 = System.nanoTime()
val plan = mapped3.queryExecution.analyzed
val analyzedMapped4 = new org.apache.spark.sql.DataFrame(spark, plan,
org.apache.spark.sql.catalyst.encoders.RowEncoder(mapped3.schema))
val mapped7 = Range(1, 100).foldLeft(analyzedMapped4) { (df, i) =>
df.withColumn(s"value${i}_4", df("value") + i)
}
val s9 = System.nanoTime()
val secondsToNanos = 1000*1000*1000.0
val stage1 = (s2-s1)/secondsToNanos
val stage2 = (s3-s2)/secondsToNanos
val stage3 = (s4-s3)/secondsToNanos
val stage4 = (s5-s4)/secondsToNanos
val stage5 = (s6-s5)/secondsToNanos
val stage6 = (s7-s6)/secondsToNanos
val stage7 = (s8-s7)/secondsToNanos
val stage8 = (s9-s8)/secondsToNanos
println(s"First 100: ${stage1}")
println(s"Second 100: ${stage2}")
println(s"Third 100: ${stage3}")
println(s"Fourth 100: ${stage4}")
println(s"Fourth 100 Optimization time: ${stage5}")
println(s"Fourth 100 Optimized ${stage6}")
println(s"Fourth Unoptimized (to make sure no caching/etc takes place, reusing
analyzed etc: ${stage7}")
println(s"Fourth selects: ${stage8}")
{code}
This results in the following performance:
{code}
First 100: 4.873489454
Second 100: 14.982028303 seconds
Third 100: 38.775467952 seconds
Fourth 100: 73.429119675 seconds
Fourth 100 Optimization time: 1.777374175 seconds
Fourth 100 Optimized 22.514489934 seconds
Fourth Unoptimized (to make sure no caching/etc takes place, reusing analyzed
etc: 69.616112734 seconds
Fourth 100 using analyzed plan: 67.641982709 seconds
{code}
Now, I suspect that we can't just sub in the optimized plan for the logical
plan because we lose a bunch of information which may be useful for
optimization later. But, I suspect there's something we can do in the case of
Projects at least that might be useful.
was:
Consider the following test case. We create a dataframe with 100 withColumn
statements, then 100 more, then 100 more, then 100 more. Each time we do this
it gets slower pretty drastically. If we sub in the optimized plan, we end up
with drastically better performance.
Consider the following code:
{code}
var raw = sc.parallelize(Range(1, 100)).toDF
val s1 = System.nanoTime()
var mapped = Range(1, 100).foldLeft(raw) { (df, i) =>
df.withColumn(s"value${i}", df("value") + i)
}
val s2 = System.nanoTime()
val mapped2 = Range(1, 100).foldLeft(mapped) { (df, i) =>
df.withColumn(s"value${i}_2", df("value") + i)
}
val s3 = System.nanoTime()
val mapped3 = Range(1, 100).foldLeft(mapped2) { (df, i) =>
df.withColumn(s"value${i}_3", df("value") + i)
}
val s4 = System.nanoTime()
val mapped4 = Range(1, 100).foldLeft(mapped3) { (df, i) =>
df.withColumn(s"value${i}_4", df("value") + i)
}
val s5 = System.nanoTime()
val plan = mapped3.queryExecution.optimizedPlan
val optimizedMapped3 = new org.apache.spark.sql.DataFrame(spark, plan,
org.apache.spark.sql.catalyst.encoders.RowEncoder(mapped3.schema))
val s6 = System.nanoTime()
val mapped5 = Range(1, 100).foldLeft(optimizedMapped3) { (df, i) =>
df.withColumn(s"value${i}_4", df("value") + i)
}
val s7 = System.nanoTime()
val mapped6 = Range(1, 100).foldLeft(mapped3) { (df, i) =>
df.withColumn(s"value${i}_4", df("value") + i)
}
val s8 = System.nanoTime()
val plan = mapped3.queryExecution.analyzed
val analyzedMapped4 = new org.apache.spark.sql.DataFrame(spark, plan,
org.apache.spark.sql.catalyst.encoders.RowEncoder(mapped3.schema))
val mapped7 = Range(1, 100).foldLeft(analyzedMapped4) { (df, i) =>
df.withColumn(s"value${i}_4", df("value") + i)
}
val s9 = System.nanoTime()
val secondsToNanos = 1000*1000*1000.0
val stage1 = (s2-s1)/secondsToNanos
val stage2 = (s3-s2)/secondsToNanos
val stage3 = (s4-s3)/secondsToNanos
val stage4 = (s5-s4)/secondsToNanos
val stage5 = (s6-s5)/secondsToNanos
val stage6 = (s7-s6)/secondsToNanos
val stage7 = (s8-s7)/secondsToNanos
val stage8 = (s9-s8)/secondsToNanos
println(s"First 100: ${stage1}")
println(s"Second 100: ${stage2}")
println(s"Third 100: ${stage3}")
println(s"Fourth 100: ${stage4}")
println(s"Fourth 100 Optimization time: ${stage5}")
println(s"Fourth 100 Optimized ${stage6}")
println(s"Fourth Unoptimized (to make sure no caching/etc takes place, reusing
analyzed etc: ${stage7}")
println(s"Fourth selects: ${stage8}")
{code}
This results in the following performance:
{code}
First 100: 4.873489454
Second 100: 14.982028303 seconds
Third 100: 38.775467952 seconds
Fourth 100: 73.429119675 seconds
Fourth 100 Optimization time: 1.777374175 seconds
Fourth 100 Optimized 22.514489934 seconds
Fourth Unoptimized (to make sure no caching/etc takes place, reusing analyzed
etc: 69.616112734 seconds
Fourth 100 using analyzed plan: 67.641982709 seconds
{code}
Now, I suspect that we can't just sub in the optimized plan for the logical
plan because we lose a bunch of information which may be useful for
optimization later. But, I suspect there's something we can do in the case of
Projects at least that might be useful.
> WithColumn Performance Degrades with Number of Invocations
> ----------------------------------------------------------
>
> Key: SPARK-17006
> URL: https://issues.apache.org/jira/browse/SPARK-17006
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.0
> Reporter: Hamel Ajay Kothari
>
> Consider the following test case. We create a dataframe with 100 withColumn
> statements, then 100 more, then 100 more, then 100 more. Each time we do this
> it gets slower pretty drastically. If we sub in the optimized plan, we end up
> with drastically better performance.
> Consider the following code:
> {code}
> val raw = sc.parallelize(Range(1, 100)).toDF
> val s1 = System.nanoTime()
> var mapped = Range(1, 100).foldLeft(raw) { (df, i) =>
> df.withColumn(s"value${i}", df("value") + i)
> }
> val s2 = System.nanoTime()
> val mapped2 = Range(1, 100).foldLeft(mapped) { (df, i) =>
> df.withColumn(s"value${i}_2", df("value") + i)
> }
> val s3 = System.nanoTime()
> val mapped3 = Range(1, 100).foldLeft(mapped2) { (df, i) =>
> df.withColumn(s"value${i}_3", df("value") + i)
> }
> val s4 = System.nanoTime()
> val mapped4 = Range(1, 100).foldLeft(mapped3) { (df, i) =>
> df.withColumn(s"value${i}_4", df("value") + i)
> }
> val s5 = System.nanoTime()
> val plan = mapped3.queryExecution.optimizedPlan
> val optimizedMapped3 = new org.apache.spark.sql.DataFrame(spark, plan,
> org.apache.spark.sql.catalyst.encoders.RowEncoder(mapped3.schema))
> val s6 = System.nanoTime()
> val mapped5 = Range(1, 100).foldLeft(optimizedMapped3) { (df, i) =>
> df.withColumn(s"value${i}_4", df("value") + i)
> }
> val s7 = System.nanoTime()
> val mapped6 = Range(1, 100).foldLeft(mapped3) { (df, i) =>
> df.withColumn(s"value${i}_4", df("value") + i)
> }
> val s8 = System.nanoTime()
> val plan = mapped3.queryExecution.analyzed
> val analyzedMapped4 = new org.apache.spark.sql.DataFrame(spark, plan,
> org.apache.spark.sql.catalyst.encoders.RowEncoder(mapped3.schema))
> val mapped7 = Range(1, 100).foldLeft(analyzedMapped4) { (df, i) =>
> df.withColumn(s"value${i}_4", df("value") + i)
> }
> val s9 = System.nanoTime()
> val secondsToNanos = 1000*1000*1000.0
> val stage1 = (s2-s1)/secondsToNanos
> val stage2 = (s3-s2)/secondsToNanos
> val stage3 = (s4-s3)/secondsToNanos
> val stage4 = (s5-s4)/secondsToNanos
> val stage5 = (s6-s5)/secondsToNanos
> val stage6 = (s7-s6)/secondsToNanos
> val stage7 = (s8-s7)/secondsToNanos
> val stage8 = (s9-s8)/secondsToNanos
> println(s"First 100: ${stage1}")
> println(s"Second 100: ${stage2}")
> println(s"Third 100: ${stage3}")
> println(s"Fourth 100: ${stage4}")
> println(s"Fourth 100 Optimization time: ${stage5}")
> println(s"Fourth 100 Optimized ${stage6}")
> println(s"Fourth Unoptimized (to make sure no caching/etc takes place,
> reusing analyzed etc: ${stage7}")
> println(s"Fourth selects: ${stage8}")
> {code}
> This results in the following performance:
> {code}
> First 100: 4.873489454
> Second 100: 14.982028303 seconds
> Third 100: 38.775467952 seconds
> Fourth 100: 73.429119675 seconds
> Fourth 100 Optimization time: 1.777374175 seconds
> Fourth 100 Optimized 22.514489934 seconds
> Fourth Unoptimized (to make sure no caching/etc takes place, reusing analyzed
> etc: 69.616112734 seconds
> Fourth 100 using analyzed plan: 67.641982709 seconds
> {code}
> Now, I suspect that we can't just sub in the optimized plan for the logical
> plan because we lose a bunch of information which may be useful for
> optimization later. But, I suspect there's something we can do in the case of
> Projects at least that might be useful.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]