Repository: spark
Updated Branches:
  refs/heads/branch-2.1 b020ce408 -> 2b36f4943


[SPARK-17460][SQL] Make sure sizeInBytes in Statistics will not overflow

## What changes were proposed in this pull request?

1. In SparkStrategies.canBroadcast, I will add the check   
plan.statistics.sizeInBytes >= 0
2. In LocalRelations.statistics, when calculate the statistics, I will change 
the size to BigInt so it won't overflow.

## How was this patch tested?

I will add a test case to make sure the statistics.sizeInBytes won't overflow.

Author: Huaxin Gao <[email protected]>

Closes #16175 from huaxingao/spark-17460.

(cherry picked from commit c5172568b59b4cf1d3dc7ed8c17a9bea2ea2ab79)
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b36f494
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b36f494
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b36f494

Branch: refs/heads/branch-2.1
Commit: 2b36f4943051fafea0b12b662b4f4dab54806d26
Parents: b020ce4
Author: Huaxin Gao <[email protected]>
Authored: Sat Dec 10 22:41:40 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Sat Dec 10 22:42:11 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/plans/logical/LocalRelation.scala  |  3 ++-
 .../org/apache/spark/sql/execution/SparkStrategies.scala  |  3 ++-
 .../scala/org/apache/spark/sql/internal/SQLConf.scala     |  4 ++--
 .../test/scala/org/apache/spark/sql/DatasetSuite.scala    | 10 ++++++++++
 4 files changed, 16 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2b36f494/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index 890865d..91633f5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -75,7 +75,8 @@ case class LocalRelation(output: Seq[Attribute], data: 
Seq[InternalRow] = Nil)
   }
 
   override lazy val statistics =
-    Statistics(sizeInBytes = output.map(_.dataType.defaultSize).sum * 
data.length)
+    Statistics(sizeInBytes =
+      (output.map(n => BigInt(n.dataType.defaultSize))).sum * data.length)
 
   def toSQL(inlineTableName: String): String = {
     require(data.nonEmpty)

http://git-wip-us.apache.org/repos/asf/spark/blob/2b36f494/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index d88cbdf..b0bbcfc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -115,7 +115,8 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
      */
     private def canBroadcast(plan: LogicalPlan): Boolean = {
       plan.statistics.isBroadcastable ||
-        plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold
+        (plan.statistics.sizeInBytes >= 0 &&
+          plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold)
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2b36f494/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 809b267..24c3d0b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -136,7 +136,7 @@ object SQLConf {
       "That is to say by default the optimizer will not choose to broadcast a 
table unless it " +
       "knows for sure its size is small enough.")
     .longConf
-    .createWithDefault(-1)
+    .createWithDefault(Long.MaxValue)
 
   val SHUFFLE_PARTITIONS = SQLConfigBuilder("spark.sql.shuffle.partitions")
     .doc("The default number of partitions to use when shuffling data for 
joins or aggregations.")
@@ -738,7 +738,7 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
 
   def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED)
 
-  def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, Long.MaxValue)
+  def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES)
 
   def isParquetSchemaMergingEnabled: Boolean = 
getConf(PARQUET_SCHEMA_MERGING_ENABLED)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b36f494/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 1174d73..cb64aab 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1060,6 +1060,16 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
     }
     assert(e.getMessage.contains("Cannot create encoder for Option of Product 
type"))
   }
+
+  test ("SPARK-17460: the sizeInBytes in Statistics shouldn't overflow to a 
negative number") {
+    // Since the sizeInBytes in Statistics could exceed the limit of an Int, 
we should use BigInt
+    // instead of Int for avoiding possible overflow.
+    val ds = (0 to 10000).map( i =>
+      (i, Seq((i, Seq((i, "This is really not that long of a 
string")))))).toDS()
+    val sizeInBytes = ds.logicalPlan.statistics.sizeInBytes
+    // sizeInBytes is 2404280404, before the fix, it overflows to a negative 
number
+    assert(sizeInBytes > 0)
+  }
 }
 
 case class Generic[T](id: T, value: Double)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to