This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new d97d0ac Revert "[SPARK-25474][SQL][2.3] Support
`spark.sql.statistics.fallBackToHdfs` in data source tables"
d97d0ac is described below
commit d97d0acd4f18edb9a4b4365531a06924675e789d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Aug 18 07:45:24 2019 -0700
Revert "[SPARK-25474][SQL][2.3] Support
`spark.sql.statistics.fallBackToHdfs` in data source tables"
This reverts commit 416aba48b40d931da97e549fcffa86c47790e5f2.
---
.../spark/sql/execution/command/CommandUtils.scala | 11 ------
.../execution/datasources/HadoopFsRelation.scala | 11 +-----
.../org/apache/spark/sql/hive/HiveStrategies.scala | 14 ++++++--
.../apache/spark/sql/hive/StatisticsSuite.scala | 40 ----------------------
4 files changed, 12 insertions(+), 64 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 7f1c6a9..c270486 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -137,15 +137,4 @@ object CommandUtils extends Logging {
}
newStats
}
-
- def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path,
defaultSize: Long): Long = {
- try {
- val hadoopConf = session.sessionState.newHadoopConf()
- path.getFileSystem(hadoopConf).getContentSummary(path).getLength
- } catch {
- case NonFatal(e) =>
- logWarning(s"Failed to get table size from hdfs. Using the default
size, $defaultSize.", e)
- defaultSize
- }
- }
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index abe6dd2..b2f73b7 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -21,12 +21,9 @@ import java.util.Locale
import scala.collection.mutable
-import org.apache.hadoop.fs.Path
-
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.FileRelation
-import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
import org.apache.spark.sql.types.{StructField, StructType}
@@ -90,13 +87,7 @@ case class HadoopFsRelation(
override def sizeInBytes: Long = {
val compressionFactor = sqlContext.conf.fileCompressionFactor
- val defaultSize = (location.sizeInBytes * compressionFactor).toLong
- location match {
- case cfi: CatalogFileIndex if
sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled =>
- CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, new
Path(cfi.table.location),
- defaultSize)
- case _ => defaultSize
- }
+ (location.sizeInBytes * compressionFactor).toLong
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index e942124..fee6f00 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -30,7 +30,7 @@ import
org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab
ScriptTransformation}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.{CommandUtils,
CreateTableCommand, DDLUtils}
+import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{CreateTable,
LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
ParquetOptions}
import org.apache.spark.sql.hive.execution._
@@ -119,8 +119,16 @@ class DetermineTableStats(session: SparkSession) extends
Rule[LogicalPlan] {
if DDLUtils.isHiveTable(relation.tableMeta) &&
relation.tableMeta.stats.isEmpty =>
val table = relation.tableMeta
val sizeInBytes = if
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
- CommandUtils.getSizeInBytesFallBackToHdfs(session, new
Path(table.location),
- session.sessionState.conf.defaultSizeInBytes)
+ try {
+ val hadoopConf = session.sessionState.newHadoopConf()
+ val tablePath = new Path(table.location)
+ val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+ fs.getContentSummary(tablePath).getLength
+ } catch {
+ case e: IOException =>
+ logWarning("Failed to get table size from hdfs.", e)
+ session.sessionState.conf.defaultSizeInBytes
+ }
} else {
session.sessionState.conf.defaultSizeInBytes
}
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 1072d0b..3af8af0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -1390,44 +1390,4 @@ class StatisticsSuite extends
StatisticsCollectionTestBase with TestHiveSingleto
assert(catalogStats.rowCount.isEmpty)
}
}
-
- test("SPARK-25474: test sizeInBytes for CatalogFileIndex dataSourceTable") {
- withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
- withTable("t1", "t2") {
- sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED
BY (name)")
- sql("INSERT INTO t1 VALUES (1, 'a')")
- checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"),
"sizeInBytes=8.0 EiB")
- sql("CREATE TABLE t2 (id INT, name STRING) USING PARQUET PARTITIONED
BY (name)")
- sql("INSERT INTO t2 VALUES (1, 'a')")
- checkKeywordsExist(sql("EXPLAIN SELECT * FROM t1, t2 WHERE
t1.id=t2.id"),
- "BroadcastHashJoin")
- }
- }
- }
-
- test("SPARK-25474: should not fall back to hdfs when table statistics
exists" +
- " for CatalogFileIndex dataSourceTable") {
-
- var sizeInBytesDisabledFallBack, sizeInBytesEnabledFallBack = 0L
- Seq(true, false).foreach { fallBackToHdfs =>
- withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key ->
fallBackToHdfs.toString) {
- withTable("t1") {
- sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED
BY (name)")
- sql("INSERT INTO t1 VALUES (1, 'a')")
- // Analyze command updates the statistics of table `t1`
- sql("ANALYZE TABLE t1 COMPUTE STATISTICS")
- val catalogTable = getCatalogTable("t1")
- assert(catalogTable.stats.isDefined)
-
- if (!fallBackToHdfs) {
- sizeInBytesDisabledFallBack =
catalogTable.stats.get.sizeInBytes.toLong
- } else {
- sizeInBytesEnabledFallBack =
catalogTable.stats.get.sizeInBytes.toLong
- }
- checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"),
"sizeInBytes=8.0 EiB")
- }
- }
- }
- assert(sizeInBytesEnabledFallBack === sizeInBytesDisabledFallBack)
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]