This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4e61de4 [SPARK-27863][SQL] Metadata files and temporary files should
not be counted as data files
4e61de4 is described below
commit 4e61de4380ba8f589f202b889935c93338ea520f
Author: Yuming Wang <[email protected]>
AuthorDate: Tue May 28 09:28:35 2019 -0700
[SPARK-27863][SQL] Metadata files and temporary files should not be counted
as data files
## What changes were proposed in this pull request?
[`DataSourceUtils.isDataPath(path)`](https://github.com/apache/spark/blob/v2.4.3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala#L95)
should be `DataSourceUtils.isDataPath(status.getPath)`.
This pr fix this issue.
## How was this patch tested?
unit tests
Closes #24725 from wangyum/SPARK-27863.
Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/sql/execution/command/CommandUtils.scala | 11 ++++---
.../spark/sql/StatisticsCollectionSuite.scala | 38 +++++++++++++++++++++-
.../spark/sql/StatisticsCollectionTestBase.scala | 4 +++
.../apache/spark/sql/hive/StatisticsSuite.scala | 2 +-
4 files changed, 48 insertions(+), 7 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 70e7cd9..cac2519 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -64,9 +64,7 @@ object CommandUtils extends Logging {
val paths = partitions.map(x => new Path(x.storage.locationUri.get))
val stagingDir =
sessionState.conf.getConfString("hive.exec.stagingdir", ".hive-staging")
val pathFilter = new PathFilter with Serializable {
- override def accept(path: Path): Boolean = {
- DataSourceUtils.isDataPath(path) &&
!path.getName.startsWith(stagingDir)
- }
+ override def accept(path: Path): Boolean = isDataPath(path,
stagingDir)
}
val fileStatusSeq = InMemoryFileIndex.bulkListLeafFiles(
paths, sessionState.newHadoopConf(), pathFilter, spark)
@@ -98,8 +96,7 @@ object CommandUtils extends Logging {
val size = if (fileStatus.isDirectory) {
fs.listStatus(path)
.map { status =>
- if (!status.getPath.getName.startsWith(stagingDir) &&
- DataSourceUtils.isDataPath(path)) {
+ if (isDataPath(status.getPath, stagingDir)) {
getPathSize(fs, status.getPath)
} else {
0L
@@ -343,4 +340,8 @@ object CommandUtils extends Logging {
cs.copy(histogram = Some(histogram))
}
}
+
+ private def isDataPath(path: Path, stagingDir: String): Boolean = {
+ !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path)
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index ba47b09..4c78f85 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -17,7 +17,8 @@
package org.apache.spark.sql
-import java.io.File
+import java.io.{File, PrintWriter}
+import java.net.URI
import java.util.TimeZone
import java.util.concurrent.TimeUnit
@@ -614,4 +615,39 @@ class StatisticsCollectionSuite extends
StatisticsCollectionTestBase with Shared
}
}
}
+
+ test("Metadata files and temporary files should not be counted as data
files") {
+ withTempDir { tempDir =>
+ val tableName = "t1"
+ val stagingDirName = ".test-staging-dir"
+ val tableLocation = s"${tempDir.toURI}/$tableName"
+ withSQLConf(
+ SQLConf.AUTO_SIZE_UPDATE_ENABLED.key -> "true",
+ "hive.exec.stagingdir" -> stagingDirName) {
+ withTable("t1") {
+ sql(s"CREATE TABLE $tableName(c1 BIGINT) USING PARQUET LOCATION
'$tableLocation'")
+ sql(s"INSERT INTO TABLE $tableName VALUES(1)")
+
+ val staging = new File(new URI(s"$tableLocation/$stagingDirName"))
+ Utils.tryWithResource(new PrintWriter(staging)) { stagingWriter =>
+ stagingWriter.write("12")
+ }
+
+ val metadata = new File(new URI(s"$tableLocation/_metadata"))
+ Utils.tryWithResource(new PrintWriter(metadata)) { metadataWriter =>
+ metadataWriter.write("1234")
+ }
+
+ sql(s"INSERT INTO TABLE $tableName VALUES(1)")
+
+ val stagingFileSize = staging.length()
+ val metadataFileSize = metadata.length()
+ val tableLocationSize = getDataSize(new File(new URI(tableLocation)))
+
+ val stats = checkTableStats(tableName, hasSizeInBytes = true,
expectedRowCounts = None)
+ assert(stats.get.sizeInBytes === tableLocationSize - stagingFileSize
- metadataFileSize)
+ }
+ }
+ }
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index 346fb765..0a2abdf 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import java.{lang => jl}
+import java.io.File
import java.sql.{Date, Timestamp}
import java.util.concurrent.TimeUnit
@@ -294,6 +295,9 @@ abstract class StatisticsCollectionTestBase extends
QueryTest with SQLTestUtils
}
}
+ def getDataSize(file: File): Long =
+ file.listFiles.filter(!_.getName.endsWith(".crc")).map(_.length).sum
+
// This test will be run twice: with and without Hive support
test("SPARK-18856: non-empty partitioned table should not report zero size")
{
withTable("ds_tbl", "hive_tbl") {
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 81cac9d..44b1362 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -120,7 +120,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase
with TestHiveSingleto
withTempDir { tempDir =>
withTable("t1") {
spark.range(5).write.mode(SaveMode.Overwrite).parquet(tempDir.getCanonicalPath)
- val dataSize =
tempDir.listFiles.filter(!_.getName.endsWith(".crc")).map(_.length).sum
+ val dataSize = getDataSize(tempDir)
spark.sql(
s"""
|CREATE EXTERNAL TABLE t1(id BIGINT)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]