This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9756fbaa8c9 [SPARK-38573][SQL] Support Auto Partition Statistics
Collection
9756fbaa8c9 is described below
commit 9756fbaa8c9c4648e8c40a2e687295502d7b1196
Author: Kazuyuki Tanimura <[email protected]>
AuthorDate: Fri Apr 15 09:18:07 2022 -0700
[SPARK-38573][SQL] Support Auto Partition Statistics Collection
### What changes were proposed in this pull request?
Currently https://issues.apache.org/jira/browse/SPARK-21127 supports
storing the aggregated stats automatically at table level with the config
`spark.sql.statistics.size.autoUpdate.enabled`.
This PR proposes to update partition statistics automatically at the same
time when the `spark.sql.statistics.size.autoUpdate.enabled` config is enabled.
### Why are the changes needed?
Supporting partition level stats are useful to know which partitions are
outliers (skewed partition) and query optimizer works better with partition
level stats in case of partition pruning.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Updated unit tests
Closes #36067 from kazuyukitanimura/SPARK-38573.
Authored-by: Kazuyuki Tanimura <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
---
.../execution/command/AnalyzeColumnCommand.scala | 2 +-
.../spark/sql/execution/command/CommandUtils.scala | 28 +++++---
.../apache/spark/sql/execution/command/ddl.scala | 1 +
.../apache/spark/sql/hive/StatisticsSuite.scala | 83 +++++++++++++++++++++-
4 files changed, 103 insertions(+), 11 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 5cb347868b1..88bba7f5ec9 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -109,7 +109,7 @@ case class AnalyzeColumnCommand(
throw QueryCompilationErrors.analyzeTableNotSupportedOnViewsError()
}
} else {
- val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession,
tableMeta)
+ val (sizeInBytes, _) = CommandUtils.calculateTotalSize(sparkSession,
tableMeta)
val relation = sparkSession.table(tableIdent).logicalPlan
val columnsToAnalyze = getColumnsToAnalyze(tableIdent, relation,
columnNames, allColumns)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 312f17543ce..2154a5893dd 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable,
CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable,
CatalogTablePartition, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
@@ -57,12 +57,15 @@ object CommandUtils extends Logging {
val catalog = sparkSession.sessionState.catalog
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
val newTable = catalog.getTableMetadata(table.identifier)
- val newSize = CommandUtils.calculateTotalSize(sparkSession, newTable)
+ val (newSize, newPartitions) =
CommandUtils.calculateTotalSize(sparkSession, newTable)
val isNewStats = newTable.stats.map(newSize !=
_.sizeInBytes).getOrElse(true)
if (isNewStats) {
val newStats = CatalogStatistics(sizeInBytes = newSize)
catalog.alterTableStats(table.identifier, Some(newStats))
}
+ if (newPartitions.nonEmpty) {
+ catalog.alterPartitions(table.identifier, newPartitions)
+ }
} else if (table.stats.nonEmpty) {
catalog.alterTableStats(table.identifier, None)
} else {
@@ -71,22 +74,29 @@ object CommandUtils extends Logging {
}
}
- def calculateTotalSize(spark: SparkSession, catalogTable: CatalogTable):
BigInt = {
+ def calculateTotalSize(
+ spark: SparkSession,
+ catalogTable: CatalogTable): (BigInt, Seq[CatalogTablePartition]) = {
val sessionState = spark.sessionState
val startTime = System.nanoTime()
- val totalSize = if (catalogTable.partitionColumnNames.isEmpty) {
- calculateSingleLocationSize(sessionState, catalogTable.identifier,
- catalogTable.storage.locationUri)
+ val (totalSize, newPartitions) = if
(catalogTable.partitionColumnNames.isEmpty) {
+ (calculateSingleLocationSize(sessionState, catalogTable.identifier,
+ catalogTable.storage.locationUri), Seq())
} else {
// Calculate table size as a sum of the visible partitions. See
SPARK-21079
val partitions =
sessionState.catalog.listPartitions(catalogTable.identifier)
logInfo(s"Starting to calculate sizes for ${partitions.length}
partitions.")
val paths = partitions.map(_.storage.locationUri)
- calculateMultipleLocationSizes(spark, catalogTable.identifier, paths).sum
+ val sizes = calculateMultipleLocationSizes(spark,
catalogTable.identifier, paths)
+ val newPartitions = partitions.zipWithIndex.flatMap { case (p, idx) =>
+ val newStats = CommandUtils.compareAndGetNewStats(p.stats, sizes(idx),
None)
+ newStats.map(_ => p.copy(stats = newStats))
+ }
+ (sizes.sum, newPartitions)
}
logInfo(s"It took ${(System.nanoTime() - startTime) / (1000 * 1000)} ms to
calculate" +
s" the total size for table ${catalogTable.identifier}.")
- totalSize
+ (totalSize, newPartitions)
}
def calculateSingleLocationSize(
@@ -222,7 +232,7 @@ object CommandUtils extends Logging {
}
} else {
// Compute stats for the whole table
- val newTotalSize = CommandUtils.calculateTotalSize(sparkSession,
tableMeta)
+ val (newTotalSize, _) = CommandUtils.calculateTotalSize(sparkSession,
tableMeta)
val newRowCount =
if (noScan) None else
Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 14d0e9753f2..e9ec98e6d0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -480,6 +480,7 @@ case class AlterTableAddPartitionCommand(
if (addedSize > 0) {
val newStats = CatalogStatistics(sizeInBytes =
table.stats.get.sizeInBytes + addedSize)
catalog.alterTableStats(table.identifier, Some(newStats))
+ catalog.alterPartitions(table.identifier, parts)
}
} else {
// Re-calculating of table size including all partitions
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 46acc9b2f0a..c689682a46b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -201,7 +201,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase
with TestHiveSingleto
.getTableMetadata(TableIdentifier(checkSizeTable))
HiveCatalogMetrics.reset()
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 0)
- val size = CommandUtils.calculateTotalSize(spark, tableMeta)
+ val (size, _) = CommandUtils.calculateTotalSize(spark, tableMeta)
assert(HiveCatalogMetrics.METRIC_PARALLEL_LISTING_JOB_COUNT.getCount() == 1)
assert(size === BigInt(17436))
}
@@ -984,6 +984,16 @@ class StatisticsSuite extends StatisticsCollectionTestBase
with TestHiveSingleto
assert(fetched2.get.colStats.isEmpty)
val statsProp = getStatsProperties(table)
assert(statsProp(STATISTICS_TOTAL_SIZE).toLong ==
fetched2.get.sizeInBytes)
+
+ // SPARK-38573: Support Partition Level Statistics Collection
+ val partStats1 = getPartitionStats(table, Map("ds" ->
"2008-04-08", "hr" -> "11"))
+ assert(partStats1.sizeInBytes > 0)
+ val partStats2 = getPartitionStats(table, Map("ds" ->
"2008-04-08", "hr" -> "12"))
+ assert(partStats2.sizeInBytes > 0)
+ val partStats3 = getPartitionStats(table, Map("ds" ->
"2008-04-09", "hr" -> "11"))
+ assert(partStats3.sizeInBytes > 0)
+ val partStats4 = getPartitionStats(table, Map("ds" ->
"2008-04-09", "hr" -> "12"))
+ assert(partStats4.sizeInBytes > 0)
} else {
assert(getStatsProperties(table).isEmpty)
}
@@ -1007,6 +1017,10 @@ class StatisticsSuite extends
StatisticsCollectionTestBase with TestHiveSingleto
assert(fetched4.get.colStats.isEmpty)
val statsProp = getStatsProperties(table)
assert(statsProp(STATISTICS_TOTAL_SIZE).toLong ==
fetched4.get.sizeInBytes)
+
+ // SPARK-38573: Support Partition Level Statistics Collection
+ val partStats3 = getPartitionStats(table, Map("ds" ->
"2008-04-09", "hr" -> "11"))
+ assert(partStats3.sizeInBytes > 0)
} else {
assert(getStatsProperties(table).isEmpty)
}
@@ -1529,4 +1543,71 @@ class StatisticsSuite extends
StatisticsCollectionTestBase with TestHiveSingleto
}
}
}
+
+ test("SPARK-38573: partition stats auto update for dynamic partitions") {
+ val table = "partition_stats_dynamic_partition"
+ Seq("hive", "parquet").foreach { source =>
+ withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true") {
+ withTable(table) {
+ sql(s"CREATE TABLE $table (id INT, sp INT, dp INT) USING $source
PARTITIONED BY (sp, dp)")
+ sql(s"INSERT INTO $table PARTITION (sp=0, dp) VALUES (0, 0)")
+ sql(s"INSERT OVERWRITE TABLE $table PARTITION (sp=0, dp) SELECT id,
id FROM range(5)")
+ for (i <- 0 until 5) {
+ val partStats = getPartitionStats(table, Map("sp" -> s"0", "dp" ->
s"$i"))
+ assert(partStats.sizeInBytes > 0)
+ }
+ }
+ }
+ }
+ }
+
+ test("SPARK-38573: change partition stats after load/set/truncate data
command") {
+ val table = "partition_stats_load_set_truncate"
+ withSQLConf(SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true") {
+ withTable(table) {
+ sql(s"CREATE TABLE $table (i INT, j STRING) USING hive " +
+ "PARTITIONED BY (ds STRING, hr STRING)")
+
+ withTempPaths(numPaths = 2) { case Seq(dir1, dir2) =>
+ val partDir1 = new File(new File(dir1, "ds=2008-04-09"), "hr=11")
+ val file1 = new File(partDir1, "data")
+ file1.getParentFile.mkdirs()
+ Utils.tryWithResource(new PrintWriter(file1)) { writer =>
+ writer.write("1,a")
+ }
+
+ val partDir2 = new File(new File(dir2, "ds=2008-04-09"), "hr=12")
+ val file2 = new File(partDir2, "data")
+ file2.getParentFile.mkdirs()
+ Utils.tryWithResource(new PrintWriter(file2)) { writer =>
+ writer.write("1,a")
+ }
+
+ sql(s"""
+ |LOAD DATA INPATH '${file1.toURI.toString}' INTO TABLE $table
+ |PARTITION (ds='2008-04-09', hr='11')
+ """.stripMargin)
+ sql(s"ALTER TABLE $table ADD PARTITION (ds='2008-04-09', hr='12')")
+ sql(s"""
+ |ALTER TABLE $table PARTITION (ds='2008-04-09', hr='12')
+ |SET LOCATION '${partDir2.toURI.toString}'
+ |""".stripMargin)
+ val partStats1 = getPartitionStats(table, Map("ds" -> "2008-04-09",
"hr" -> "11"))
+ assert(partStats1.sizeInBytes > 0)
+ val partStats2 = getPartitionStats(table, Map("ds" -> "2008-04-09",
"hr" -> "12"))
+ assert(partStats2.sizeInBytes > 0)
+
+
+ sql(s"TRUNCATE TABLE $table PARTITION (ds='2008-04-09', hr='11')")
+ val partStats3 = getPartitionStats(table, Map("ds" -> "2008-04-09",
"hr" -> "11"))
+ assert(partStats3.sizeInBytes == 0)
+ val partStats4 = getPartitionStats(table, Map("ds" -> "2008-04-09",
"hr" -> "12"))
+ assert(partStats4.sizeInBytes > 0)
+ sql(s"TRUNCATE TABLE $table")
+ val partStats5 = getPartitionStats(table, Map("ds" -> "2008-04-09",
"hr" -> "12"))
+ assert(partStats5.sizeInBytes == 0)
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]