Repository: spark
Updated Branches:
  refs/heads/master 2e1586f60 -> b449a1d6a


[SPARK-21079][SQL] Calculate total size of a partition table as a sum of 
individual partitions

## What changes were proposed in this pull request?

Storage URI of a partitioned table may or may not point to a directory under 
which individual partitions are stored. In fact, individual partitions may be 
located in totally unrelated directories. Before this change, ANALYZE TABLE 
table COMPUTE STATISTICS command calculated total size of a table by adding up 
sizes of files found under table's storage URI. This calculation could produce 
0 if partitions are stored elsewhere.

This change uses storage URIs of individual partitions to calculate the sizes 
of all partitions of a table and adds these up to produce the total size of a 
table.

CC: wzhfy

## How was this patch tested?

Added unit test.

Ran ANALYZE TABLE xxx COMPUTE STATISTICS on a partitioned Hive table and 
verified that sizeInBytes is calculated correctly. Before this change, the size 
would be zero.

Author: Masha Basmanova <[email protected]>

Closes #18309 from mbasmanova/mbasmanova-analyze-part-table.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b449a1d6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b449a1d6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b449a1d6

Branch: refs/heads/master
Commit: b449a1d6aa322a50cf221cd7a2ae85a91d6c7e9f
Parents: 2e1586f
Author: Masha Basmanova <[email protected]>
Authored: Sat Jun 24 22:49:35 2017 -0700
Committer: gatorsmile <[email protected]>
Committed: Sat Jun 24 22:49:35 2017 -0700

----------------------------------------------------------------------
 .../execution/command/AnalyzeTableCommand.scala | 29 ++++++--
 .../apache/spark/sql/hive/StatisticsSuite.scala | 72 ++++++++++++++++++++
 2 files changed, 95 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b449a1d6/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 3c59b98..06e588f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.command
 
+import java.net.URI
+
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.{FileSystem, Path}
@@ -81,6 +83,21 @@ case class AnalyzeTableCommand(
 object AnalyzeTableCommand extends Logging {
 
   def calculateTotalSize(sessionState: SessionState, catalogTable: 
CatalogTable): Long = {
+    if (catalogTable.partitionColumnNames.isEmpty) {
+      calculateLocationSize(sessionState, catalogTable.identifier, 
catalogTable.storage.locationUri)
+    } else {
+      // Calculate table size as a sum of the visible partitions. See 
SPARK-21079
+      val partitions = 
sessionState.catalog.listPartitions(catalogTable.identifier)
+      partitions.map(p =>
+        calculateLocationSize(sessionState, catalogTable.identifier, 
p.storage.locationUri)
+      ).sum
+    }
+  }
+
+  private def calculateLocationSize(
+      sessionState: SessionState,
+      tableId: TableIdentifier,
+      locationUri: Option[URI]): Long = {
     // This method is mainly based on
     // 
org.apache.hadoop.hive.ql.stats.StatsUtils.getFileSizeForTable(HiveConf, Table)
     // in Hive 0.13 (except that we do not use fs.getContentSummary).
@@ -91,13 +108,13 @@ object AnalyzeTableCommand extends Logging {
     // countFileSize to count the table size.
     val stagingDir = sessionState.conf.getConfString("hive.exec.stagingdir", 
".hive-staging")
 
-    def calculateTableSize(fs: FileSystem, path: Path): Long = {
+    def calculateLocationSize(fs: FileSystem, path: Path): Long = {
       val fileStatus = fs.getFileStatus(path)
       val size = if (fileStatus.isDirectory) {
         fs.listStatus(path)
           .map { status =>
             if (!status.getPath.getName.startsWith(stagingDir)) {
-              calculateTableSize(fs, status.getPath)
+              calculateLocationSize(fs, status.getPath)
             } else {
               0L
             }
@@ -109,16 +126,16 @@ object AnalyzeTableCommand extends Logging {
       size
     }
 
-    catalogTable.storage.locationUri.map { p =>
+    locationUri.map { p =>
       val path = new Path(p)
       try {
         val fs = path.getFileSystem(sessionState.newHadoopConf())
-        calculateTableSize(fs, path)
+        calculateLocationSize(fs, path)
       } catch {
         case NonFatal(e) =>
           logWarning(
-            s"Failed to get the size of table ${catalogTable.identifier.table} 
in the " +
-              s"database ${catalogTable.identifier.database} because of 
${e.toString}", e)
+            s"Failed to get the size of table ${tableId.table} in the " +
+              s"database ${tableId.database} because of ${e.toString}", e)
           0L
       }
     }.getOrElse(0L)

http://git-wip-us.apache.org/repos/asf/spark/blob/b449a1d6/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 279db9a..0ee18bb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 class StatisticsSuite extends StatisticsCollectionTestBase with 
TestHiveSingleton {
 
@@ -128,6 +129,77 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
       TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false)
   }
 
+  test("SPARK-21079 - analyze table with location different than that of 
individual partitions") {
+    def queryTotalSize(tableName: String): BigInt =
+      spark.table(tableName).queryExecution.analyzed.stats(conf).sizeInBytes
+
+    val tableName = "analyzeTable_part"
+    withTable(tableName) {
+      withTempPath { path =>
+        sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED 
BY (ds STRING)")
+
+        val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
+        partitionDates.foreach { ds =>
+          sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') SELECT * 
FROM src")
+        }
+
+        sql(s"ALTER TABLE $tableName SET LOCATION '$path'")
+
+        sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
+
+        assert(queryTotalSize(tableName) === BigInt(17436))
+      }
+    }
+  }
+
+  test("SPARK-21079 - analyze partitioned table with only a subset of 
partitions visible") {
+    def queryTotalSize(tableName: String): BigInt =
+      spark.table(tableName).queryExecution.analyzed.stats(conf).sizeInBytes
+
+    val sourceTableName = "analyzeTable_part"
+    val tableName = "analyzeTable_part_vis"
+    withTable(sourceTableName, tableName) {
+      withTempPath { path =>
+          // Create a table with 3 partitions all located under a single 
top-level directory 'path'
+          sql(
+            s"""
+               |CREATE TABLE $sourceTableName (key STRING, value STRING)
+               |PARTITIONED BY (ds STRING)
+               |LOCATION '$path'
+             """.stripMargin)
+
+          val partitionDates = List("2010-01-01", "2010-01-02", "2010-01-03")
+          partitionDates.foreach { ds =>
+              sql(
+                s"""
+                   |INSERT INTO TABLE $sourceTableName PARTITION (ds='$ds')
+                   |SELECT * FROM src
+                 """.stripMargin)
+          }
+
+          // Create another table referring to the same location
+          sql(
+            s"""
+               |CREATE TABLE $tableName (key STRING, value STRING)
+               |PARTITIONED BY (ds STRING)
+               |LOCATION '$path'
+             """.stripMargin)
+
+          // Register only one of the partitions found on disk
+          val ds = partitionDates.head
+          sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds')").collect()
+
+          // Analyze original table - expect 3 partitions
+          sql(s"ANALYZE TABLE $sourceTableName COMPUTE STATISTICS noscan")
+          assert(queryTotalSize(sourceTableName) === BigInt(3 * 5812))
+
+          // Analyze partial-copy table - expect only 1 partition
+          sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
+          assert(queryTotalSize(tableName) === BigInt(5812))
+        }
+    }
+  }
+
   test("analyzing views is not supported") {
     def assertAnalyzeUnsupported(analyzeCommand: String): Unit = {
       val err = intercept[AnalysisException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to