This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new d97d0ac  Revert "[SPARK-25474][SQL][2.3] Support 
`spark.sql.statistics.fallBackToHdfs` in data source tables"
d97d0ac is described below

commit d97d0acd4f18edb9a4b4365531a06924675e789d
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Aug 18 07:45:24 2019 -0700

    Revert "[SPARK-25474][SQL][2.3] Support 
`spark.sql.statistics.fallBackToHdfs` in data source tables"
    
    This reverts commit 416aba48b40d931da97e549fcffa86c47790e5f2.
---
 .../spark/sql/execution/command/CommandUtils.scala | 11 ------
 .../execution/datasources/HadoopFsRelation.scala   | 11 +-----
 .../org/apache/spark/sql/hive/HiveStrategies.scala | 14 ++++++--
 .../apache/spark/sql/hive/StatisticsSuite.scala    | 40 ----------------------
 4 files changed, 12 insertions(+), 64 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 7f1c6a9..c270486 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -137,15 +137,4 @@ object CommandUtils extends Logging {
     }
     newStats
   }
-
-  def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, 
defaultSize: Long): Long = {
-    try {
-      val hadoopConf = session.sessionState.newHadoopConf()
-      path.getFileSystem(hadoopConf).getContentSummary(path).getLength
-    } catch {
-      case NonFatal(e) =>
-        logWarning(s"Failed to get table size from hdfs. Using the default 
size, $defaultSize.", e)
-        defaultSize
-    }
-  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index abe6dd2..b2f73b7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -21,12 +21,9 @@ import java.util.Locale
 
 import scala.collection.mutable
 
-import org.apache.hadoop.fs.Path
-
 import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.execution.FileRelation
-import org.apache.spark.sql.execution.command.CommandUtils
 import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
 import org.apache.spark.sql.types.{StructField, StructType}
 
@@ -90,13 +87,7 @@ case class HadoopFsRelation(
 
   override def sizeInBytes: Long = {
     val compressionFactor = sqlContext.conf.fileCompressionFactor
-    val defaultSize = (location.sizeInBytes * compressionFactor).toLong
-    location match {
-      case cfi: CatalogFileIndex if 
sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled =>
-        CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, new 
Path(cfi.table.location),
-          defaultSize)
-      case _ => defaultSize
-    }
+    (location.sizeInBytes * compressionFactor).toLong
   }
 
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index e942124..fee6f00 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -30,7 +30,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab
     ScriptTransformation}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.{CommandUtils, 
CreateTableCommand, DDLUtils}
+import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
 import org.apache.spark.sql.execution.datasources.{CreateTable, 
LogicalRelation}
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetOptions}
 import org.apache.spark.sql.hive.execution._
@@ -119,8 +119,16 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
         if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.isEmpty =>
       val table = relation.tableMeta
       val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
-        CommandUtils.getSizeInBytesFallBackToHdfs(session, new 
Path(table.location),
-          session.sessionState.conf.defaultSizeInBytes)
+        try {
+          val hadoopConf = session.sessionState.newHadoopConf()
+          val tablePath = new Path(table.location)
+          val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+          fs.getContentSummary(tablePath).getLength
+        } catch {
+          case e: IOException =>
+            logWarning("Failed to get table size from hdfs.", e)
+            session.sessionState.conf.defaultSizeInBytes
+        }
       } else {
         session.sessionState.conf.defaultSizeInBytes
       }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 1072d0b..3af8af0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -1390,44 +1390,4 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
       assert(catalogStats.rowCount.isEmpty)
     }
   }
-
-  test("SPARK-25474: test sizeInBytes for CatalogFileIndex dataSourceTable") {
-    withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
-      withTable("t1", "t2") {
-        sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED 
BY (name)")
-        sql("INSERT INTO t1 VALUES (1, 'a')")
-        checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), 
"sizeInBytes=8.0 EiB")
-        sql("CREATE TABLE t2 (id INT, name STRING) USING PARQUET PARTITIONED 
BY (name)")
-        sql("INSERT INTO t2 VALUES (1, 'a')")
-        checkKeywordsExist(sql("EXPLAIN SELECT * FROM t1, t2 WHERE 
t1.id=t2.id"),
-          "BroadcastHashJoin")
-      }
-    }
-  }
-
-  test("SPARK-25474: should not fall back to hdfs when table statistics 
exists" +
-    " for CatalogFileIndex dataSourceTable") {
-
-    var sizeInBytesDisabledFallBack, sizeInBytesEnabledFallBack = 0L
-    Seq(true, false).foreach { fallBackToHdfs =>
-      withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
fallBackToHdfs.toString) {
-        withTable("t1") {
-          sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED 
BY (name)")
-          sql("INSERT INTO t1 VALUES (1, 'a')")
-          // Analyze command updates the statistics of table `t1`
-          sql("ANALYZE TABLE t1 COMPUTE STATISTICS")
-          val catalogTable = getCatalogTable("t1")
-          assert(catalogTable.stats.isDefined)
-
-          if (!fallBackToHdfs) {
-            sizeInBytesDisabledFallBack = 
catalogTable.stats.get.sizeInBytes.toLong
-          } else {
-            sizeInBytesEnabledFallBack = 
catalogTable.stats.get.sizeInBytes.toLong
-          }
-          checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), 
"sizeInBytes=8.0 EiB")
-        }
-      }
-    }
-    assert(sizeInBytesEnabledFallBack === sizeInBytesDisabledFallBack)
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to