This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e61de4  [SPARK-27863][SQL] Metadata files and temporary files should 
not be counted as data files
4e61de4 is described below

commit 4e61de4380ba8f589f202b889935c93338ea520f
Author: Yuming Wang <[email protected]>
AuthorDate: Tue May 28 09:28:35 2019 -0700

    [SPARK-27863][SQL] Metadata files and temporary files should not be counted 
as data files
    
    ## What changes were proposed in this pull request?
    
[`DataSourceUtils.isDataPath(path)`](https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala#L95)
 should be `DataSourceUtils.isDataPath(status.getPath)`.
    
    This pr fix this issue.
    
    ## How was this patch tested?
    
    unit tests
    
    Closes #24725 from wangyum/SPARK-27863.
    
    Authored-by: Yuming Wang <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/execution/command/CommandUtils.scala | 11 ++++---
 .../spark/sql/StatisticsCollectionSuite.scala      | 38 +++++++++++++++++++++-
 .../spark/sql/StatisticsCollectionTestBase.scala   |  4 +++
 .../apache/spark/sql/hive/StatisticsSuite.scala    |  2 +-
 4 files changed, 48 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 70e7cd9..cac2519 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -64,9 +64,7 @@ object CommandUtils extends Logging {
         val paths = partitions.map(x => new Path(x.storage.locationUri.get))
         val stagingDir = 
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
         val pathFilter = new PathFilter with Serializable {
-          override def accept(path: Path): Boolean = {
-            DataSourceUtils.isDataPath(path) && 
!path.getName.startsWith(stagingDir)
-          }
+          override def accept(path: Path): Boolean = isDataPath(path, 
stagingDir)
         }
         val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
           paths, sessionState.newHadoopConf(), pathFilter, spark)
@@ -98,8 +96,7 @@ object CommandUtils extends Logging {
       val size = if (fileStatus.isDirectory) {
         fs.listStatus(path)
           .map { status =>
-            if (!status.getPath.getName.startsWith(stagingDir) &&
-              DataSourceUtils.isDataPath(path)) {
+            if (isDataPath(status.getPath, stagingDir)) {
               getPathSize(fs, status.getPath)
             } else {
               0L
@@ -343,4 +340,8 @@ object CommandUtils extends Logging {
       cs.copy(histogram = Some(histogram))
     }
   }
+
+  private def isDataPath(path: Path, stagingDir: String): Boolean = {
+    !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path)
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index ba47b09..4c78f85 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.sql
 
-import java.io.File
+import java.io.{File, PrintWriter}
+import java.net.URI
 import java.util.TimeZone
 import java.util.concurrent.TimeUnit
 
@@ -614,4 +615,39 @@ class StatisticsCollectionSuite extends 
StatisticsCollectionTestBase with Shared
       }
     }
   }
+
+  test("Metadata files and temporary files should not be counted as data 
files") {
+    withTempDir { tempDir =>
+      val tableName = "t1"
+      val stagingDirName = ".test-staging-dir"
+      val tableLocation = s"${tempDir.toURI}/$tableName"
+      withSQLConf(
+        SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true",
+        "hive.exec.stagingdir" -> stagingDirName) {
+        withTable("t1") {
+          sql(s"CREATE TABLE $tableName(c1 BIGINT) USING PARQUET LOCATION 
'$tableLocation'")
+          sql(s"INSERT INTO TABLE $tableName VALUES(1)")
+
+          val staging = new File(new URI(s"$tableLocation/$stagingDirName"))
+          Utils.tryWithResource(new PrintWriter(staging)) { stagingWriter =>
+            stagingWriter.write("12")
+          }
+
+          val metadata = new File(new URI(s"$tableLocation/_metadata"))
+          Utils.tryWithResource(new PrintWriter(metadata)) { metadataWriter =>
+            metadataWriter.write("1234")
+          }
+
+          sql(s"INSERT INTO TABLE $tableName VALUES(1)")
+
+          val stagingFileSize = staging.length()
+          val metadataFileSize = metadata.length()
+          val tableLocationSize = getDataSize(new File(new URI(tableLocation)))
+
+          val stats = checkTableStats(tableName, hasSizeInBytes = true, 
expectedRowCounts = None)
+          assert(stats.get.sizeInBytes === tableLocationSize - stagingFileSize 
- metadataFileSize)
+        }
+      }
+    }
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index 346fb765..0a2abdf 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql
 
 import java.{lang => jl}
+import java.io.File
 import java.sql.{Date, Timestamp}
 import java.util.concurrent.TimeUnit
 
@@ -294,6 +295,9 @@ abstract class StatisticsCollectionTestBase extends 
QueryTest with SQLTestUtils
     }
   }
 
+  def getDataSize(file: File): Long =
+    file.listFiles.filter(!_.getName.endsWith(".crc")).map(_.length).sum
+
   // This test will be run twice: with and without Hive support
   test("SPARK-18856: non-empty partitioned table should not report zero size") 
{
     withTable("ds_tbl", "hive_tbl") {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 81cac9d..44b1362 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -120,7 +120,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
     withTempDir { tempDir =>
       withTable("t1") {
         
spark.range(5).write.mode(SaveMode.Overwrite).parquet(tempDir.getCanonicalPath)
-        val dataSize = 
tempDir.listFiles.filter(!_.getName.endsWith(".crc")).map(_.length).sum
+        val dataSize = getDataSize(tempDir)
         spark.sql(
           s"""
              |CREATE EXTERNAL TABLE t1(id BIGINT)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to