Repository: spark Updated Branches: refs/heads/branch-2.0 d65707b7f -> 4c0af3bbd
[SPARK-15392][SQL] fix default value of size estimation of logical plan ## What changes were proposed in this pull request? We use autoBroadcastJoinThreshold + 1L as the default value of size estimation, that is not good in 2.0, because we will calculate the size based on size of schema, then the estimation could be less than autoBroadcastJoinThreshold if you have an SELECT on top of an DataFrame created from RDD. This PR change the default value to Long.MaxValue. ## How was this patch tested? Added regression tests. Author: Davies Liu <[email protected]> Closes #13179 from davies/fix_default_size. (cherry picked from commit fc29b896dae08b957ed15fa681b46162600a4050) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c0af3bb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c0af3bb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c0af3bb Branch: refs/heads/branch-2.0 Commit: 4c0af3bbd5a6e88b1fedf0d9c624bbdb82c1aa40 Parents: d65707b Author: Davies Liu <[email protected]> Authored: Wed May 18 15:45:59 2016 -0700 Committer: Reynold Xin <[email protected]> Committed: Wed May 18 15:46:10 2016 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 3 +-- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 9 +++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4c0af3bb/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7933d12..a7f4613 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -605,8 +605,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED) - def defaultSizeInBytes: Long = - getConf(DEFAULT_SIZE_IN_BYTES, autoBroadcastJoinThreshold + 1L) + def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, Long.MaxValue) def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING) http://git-wip-us.apache.org/repos/asf/spark/blob/4c0af3bb/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index f573abf..df029e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1476,4 +1476,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { getMessage() assert(e1.startsWith("Path does not exist")) } + + test("SPARK-15392: DataFrame created from RDD should not be broadcasted") { + val rdd = sparkContext.range(1, 100).map(i => Row(i, i)) + val df = spark.createDataFrame(rdd, new StructType().add("a", LongType).add("b", LongType)) + assert(df.queryExecution.analyzed.statistics.sizeInBytes > + spark.wrapped.conf.autoBroadcastJoinThreshold) + assert(df.selectExpr("a").queryExecution.analyzed.statistics.sizeInBytes > + spark.wrapped.conf.autoBroadcastJoinThreshold) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
