Repository: spark Updated Branches: refs/heads/branch-2.1 b020ce408 -> 2b36f4943
[SPARK-17460][SQL] Make sure sizeInBytes in Statistics will not overflow ## What changes were proposed in this pull request? 1. In SparkStrategies.canBroadcast, I will add the check plan.statistics.sizeInBytes >= 0 2. In LocalRelations.statistics, when calculate the statistics, I will change the size to BigInt so it won't overflow. ## How was this patch tested? I will add a test case to make sure the statistics.sizeInBytes won't overflow. Author: Huaxin Gao <[email protected]> Closes #16175 from huaxingao/spark-17460. (cherry picked from commit c5172568b59b4cf1d3dc7ed8c17a9bea2ea2ab79) Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b36f494 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b36f494 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b36f494 Branch: refs/heads/branch-2.1 Commit: 2b36f4943051fafea0b12b662b4f4dab54806d26 Parents: b020ce4 Author: Huaxin Gao <[email protected]> Authored: Sat Dec 10 22:41:40 2016 +0800 Committer: Wenchen Fan <[email protected]> Committed: Sat Dec 10 22:42:11 2016 +0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/plans/logical/LocalRelation.scala | 3 ++- .../org/apache/spark/sql/execution/SparkStrategies.scala | 3 ++- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 10 ++++++++++ 4 files changed, 16 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2b36f494/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index 890865d..91633f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -75,7 +75,8 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) } override lazy val statistics = - Statistics(sizeInBytes = output.map(_.dataType.defaultSize).sum * data.length) + Statistics(sizeInBytes = + (output.map(n => BigInt(n.dataType.defaultSize))).sum * data.length) def toSQL(inlineTableName: String): String = { require(data.nonEmpty) http://git-wip-us.apache.org/repos/asf/spark/blob/2b36f494/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index d88cbdf..b0bbcfc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -115,7 +115,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ private def canBroadcast(plan: LogicalPlan): Boolean = { plan.statistics.isBroadcastable || - plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold + (plan.statistics.sizeInBytes >= 0 && + plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/2b36f494/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 809b267..24c3d0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -136,7 +136,7 @@ object SQLConf { "That is to say by default the optimizer will not choose to broadcast a table unless it " + "knows for sure its size is small enough.") .longConf - .createWithDefault(-1) + .createWithDefault(Long.MaxValue) val SHUFFLE_PARTITIONS = SQLConfigBuilder("spark.sql.shuffle.partitions") .doc("The default number of partitions to use when shuffling data for joins or aggregations.") @@ -738,7 +738,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def enableRadixSort: Boolean = getConf(RADIX_SORT_ENABLED) - def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, Long.MaxValue) + def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES) def isParquetSchemaMergingEnabled: Boolean = getConf(PARQUET_SCHEMA_MERGING_ENABLED) http://git-wip-us.apache.org/repos/asf/spark/blob/2b36f494/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 1174d73..cb64aab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1060,6 +1060,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext { } assert(e.getMessage.contains("Cannot create encoder for Option of Product type")) } + + test ("SPARK-17460: the sizeInBytes in Statistics shouldn't overflow to a negative number") { + // Since the sizeInBytes in Statistics could exceed the limit of an Int, we should use BigInt + // instead of Int for avoiding possible overflow. + val ds = (0 to 10000).map( i => + (i, Seq((i, Seq((i, "This is really not that long of a string")))))).toDS() + val sizeInBytes = ds.logicalPlan.statistics.sizeInBytes + // sizeInBytes is 2404280404, before the fix, it overflows to a negative number + assert(sizeInBytes > 0) + } } case class Generic[T](id: T, value: Double) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
