Repository: spark
Updated Branches:
  refs/heads/master 4e3cb7a5d -> 5ccecc078


[SPARK-15392][SQL] fix default value of size estimation of logical plan

## What changes were proposed in this pull request?

We use autoBroadcastJoinThreshold + 1L as the default value of size estimation, 
that is not good in 2.0, because we will calculate the size based on size of 
schema, then the estimation could be less than autoBroadcastJoinThreshold if 
you have an SELECT on top of an DataFrame created from RDD.

This PR change the default value to Long.MaxValue.

## How was this patch tested?

Added regression tests.

Author: Davies Liu <dav...@databricks.com>

Closes #13183 from davies/fix_default_size.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ccecc07
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ccecc07
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ccecc07

Branch: refs/heads/master
Commit: 5ccecc078aa757d3f1f6632aa6df5659490f602f
Parents: 4e3cb7a
Author: Davies Liu <dav...@databricks.com>
Authored: Thu May 19 12:12:42 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Thu May 19 12:12:42 2016 -0700

----------------------------------------------------------------------
 python/pyspark/sql/dataframe.py                 |  2 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |  9 +++---
 .../scala/org/apache/spark/sql/JoinSuite.scala  |  2 +-
 .../org/apache/spark/sql/StatisticsSuite.scala  | 34 ++++++++++++++++++++
 4 files changed, 40 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5ccecc07/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index a68ef33..4fa799a 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -576,7 +576,7 @@ class DataFrame(object):
         >>> df_as2 = df.alias("df_as2")
         >>> joined_df = df_as1.join(df_as2, col("df_as1.name") == 
col("df_as2.name"), 'inner')
         >>> joined_df.select("df_as1.name", "df_as2.name", 
"df_as2.age").collect()
-        [Row(name=u'Alice', name=u'Alice', age=2), Row(name=u'Bob', 
name=u'Bob', age=5)]
+        [Row(name=u'Bob', name=u'Bob', age=5), Row(name=u'Alice', 
name=u'Alice', age=2)]
         """
         assert isinstance(alias, basestring), "alias should be a string"
         return DataFrame(getattr(self._jdf, "as")(alias), self.sql_ctx)

http://git-wip-us.apache.org/repos/asf/spark/blob/5ccecc07/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 518430f..5d18689 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -120,8 +120,8 @@ object SQLConf {
       "nodes when performing a join.  By setting this value to -1 broadcasting 
can be disabled. " +
       "Note that currently statistics are only supported for Hive Metastore 
tables where the " +
       "command<code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS 
noscan</code> has been run.")
-    .intConf
-    .createWithDefault(10 * 1024 * 1024)
+    .longConf
+    .createWithDefault(10L * 1024 * 1024)
 
   val DEFAULT_SIZE_IN_BYTES = SQLConfigBuilder("spark.sql.defaultSizeInBytes")
     .internal()
@@ -599,14 +599,13 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
   def subexpressionEliminationEnabled: Boolean =
     getConf(SUBEXPRESSION_ELIMINATION_ENABLED)
 
-  def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
+  def autoBroadcastJoinThreshold: Long = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
 
   def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)
 
   def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
 
-  def defaultSizeInBytes: Long =
-    getConf(DEFAULT_SIZE_IN_BYTES, autoBroadcastJoinThreshold + 1L)
+  def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, Long.MaxValue)
 
   def isParquetBinaryAsString: Boolean = getConf(PARQUET_BINARY_AS_STRING)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5ccecc07/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index a6b83b3..a5d8cb1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -438,7 +438,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
     spark.cacheManager.clearCache()
     sql("CACHE TABLE testData")
 
-    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1000000000") {
+    withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> 
Long.MaxValue.toString) {
       Seq(
         ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a",
           classOf[BroadcastHashJoinExec]),

http://git-wip-us.apache.org/repos/asf/spark/blob/5ccecc07/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
new file mode 100644
index 0000000..9523f6f
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsSuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
+
+class StatisticsSuite extends QueryTest with SharedSQLContext {
+
+  test("SPARK-15392: DataFrame created from RDD should not be broadcasted") {
+    val rdd = sparkContext.range(1, 100).map(i => Row(i, i))
+    val df = spark.createDataFrame(rdd, new StructType().add("a", 
LongType).add("b", LongType))
+    assert(df.queryExecution.analyzed.statistics.sizeInBytes >
+      spark.wrapped.conf.autoBroadcastJoinThreshold)
+    assert(df.selectExpr("a").queryExecution.analyzed.statistics.sizeInBytes >
+      spark.wrapped.conf.autoBroadcastJoinThreshold)
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to