Repository: spark Updated Branches: refs/heads/master 8db4d95c0 -> d6f11a12a
[SPARK-18856][SQL] non-empty partitioned table should not report zero size ## What changes were proposed in this pull request? In `DataSource`, if the table is not analyzed, we will use 0 as the default value for table size. This is dangerous, we may broadcast a large table and cause OOM. We should use `defaultSizeInBytes` instead. ## How was this patch tested? new regression test Author: Wenchen Fan <[email protected]> Closes #16280 from cloud-fan/bug. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6f11a12 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6f11a12 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6f11a12 Branch: refs/heads/master Commit: d6f11a12a146a863553c5a5e2023d79d4375ef3f Parents: 8db4d95 Author: Wenchen Fan <[email protected]> Authored: Wed Dec 14 21:03:56 2016 -0800 Committer: Reynold Xin <[email protected]> Committed: Wed Dec 14 21:03:56 2016 -0800 ---------------------------------------------------------------------- .../sql/execution/datasources/DataSource.scala | 3 ++- .../spark/sql/StatisticsCollectionSuite.scala | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d6f11a12/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 9250d0e..5245c14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -388,10 +388,11 @@ case class DataSource( val fileCatalog = if (sparkSession.sqlContext.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog) { + val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes new CatalogFileIndex( sparkSession, catalogTable.get, - catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(0L)) + catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) } else { new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(partitionSchema)) } http://git-wip-us.apache.org/repos/asf/spark/blob/d6f11a12/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala index 0740849..c663b31 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala @@ -26,6 +26,7 @@ import scala.util.Random import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} import org.apache.spark.sql.test.SQLTestData.ArrayData import org.apache.spark.sql.types._ @@ -176,6 +177,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared * when using the Hive external catalog) as well as in the sql/core module. */ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils { + import testImplicits._ private val dec1 = new java.math.BigDecimal("1.000000000000000000") private val dec2 = new java.math.BigDecimal("8.000000000000000000") @@ -242,4 +244,20 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils } } } + + // This test will be run twice: with and without Hive support + test("SPARK-18856: non-empty partitioned table should not report zero size") { + withTable("ds_tbl", "hive_tbl") { + spark.range(100).select($"id", $"id" % 5 as "p").write.partitionBy("p").saveAsTable("ds_tbl") + val stats = spark.table("ds_tbl").queryExecution.optimizedPlan.statistics + assert(stats.sizeInBytes > 0, "non-empty partitioned table should not report zero size.") + + if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") { + sql("CREATE TABLE hive_tbl(i int) PARTITIONED BY (j int)") + sql("INSERT INTO hive_tbl PARTITION(j=1) SELECT 1") + val stats2 = spark.table("hive_tbl").queryExecution.optimizedPlan.statistics + assert(stats2.sizeInBytes > 0, "non-empty partitioned table should not report zero size.") + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
