This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 07c4b9b  Revert "[SPARK-25474][SQL] Support 
`spark.sql.statistics.fallBackToHdfs` in data source tables"
07c4b9b is described below

commit 07c4b9bd1fb055f283af076b2a995db8f6efe7a5
Author: Xiao Li <gatorsm...@gmail.com>
AuthorDate: Fri Aug 23 07:41:39 2019 -0700

    Revert "[SPARK-25474][SQL] Support `spark.sql.statistics.fallBackToHdfs` in 
data source tables"
    
    This reverts commit 485ae6d1818e8756a86da38d6aefc8f1dbde49c2.
    
    Closes #25563 from gatorsmile/revert.
    
    Authored-by: Xiao Li <gatorsm...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/execution/command/CommandUtils.scala | 11 ------
 .../execution/datasources/HadoopFsRelation.scala   | 13 +++----
 .../org/apache/spark/sql/hive/HiveStrategies.scala | 14 ++++++--
 .../apache/spark/sql/hive/StatisticsSuite.scala    | 40 ----------------------
 4 files changed, 15 insertions(+), 63 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 9a9d66b..b644e6d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -344,15 +344,4 @@ object CommandUtils extends Logging {
   private def isDataPath(path: Path, stagingDir: String): Boolean = {
     !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path)
   }
-
-  def getSizeInBytesFallBackToHdfs(session: SparkSession, path: Path, 
defaultSize: Long): Long = {
-    try {
-      val hadoopConf = session.sessionState.newHadoopConf()
-      path.getFileSystem(hadoopConf).getContentSummary(path).getLength
-    } catch {
-      case NonFatal(e) =>
-        logWarning(s"Failed to get table size from hdfs. Using the default 
size, $defaultSize.", e)
-        defaultSize
-    }
-  }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index f7d2315..d278802 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import org.apache.hadoop.fs.Path
+import java.util.Locale
+
+import scala.collection.mutable
 
 import org.apache.spark.sql.{SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.execution.FileRelation
-import org.apache.spark.sql.execution.command.CommandUtils
 import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
 import org.apache.spark.sql.types.{StructField, StructType}
 
@@ -70,13 +71,7 @@ case class HadoopFsRelation(
 
   override def sizeInBytes: Long = {
     val compressionFactor = sqlContext.conf.fileCompressionFactor
-    val defaultSize = (location.sizeInBytes * compressionFactor).toLong
-    location match {
-      case cfi: CatalogFileIndex if 
sparkSession.sessionState.conf.fallBackToHdfsForStatsEnabled =>
-        CommandUtils.getSizeInBytesFallBackToHdfs(sparkSession, new 
Path(cfi.table.location),
-          defaultSize)
-      case _ => defaultSize
-    }
+    (location.sizeInBytes * compressionFactor).toLong
   }
 
 
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index d09c0ab..7b28e4f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -30,7 +30,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab
     ScriptTransformation}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.{CommandUtils, 
CreateTableCommand, DDLUtils}
+import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
 import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
@@ -118,8 +118,16 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
         if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.isEmpty =>
       val table = relation.tableMeta
       val sizeInBytes = if 
(session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
-        CommandUtils.getSizeInBytesFallBackToHdfs(session, new 
Path(table.location),
-          session.sessionState.conf.defaultSizeInBytes)
+        try {
+          val hadoopConf = session.sessionState.newHadoopConf()
+          val tablePath = new Path(table.location)
+          val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
+          fs.getContentSummary(tablePath).getLength
+        } catch {
+          case e: IOException =>
+            logWarning("Failed to get table size from hdfs.", e)
+            session.sessionState.conf.defaultSizeInBytes
+        }
       } else {
         session.sessionState.conf.defaultSizeInBytes
       }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index e20099a..b4e5058 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -1484,44 +1484,4 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
       }
     }
   }
-
-  test("SPARK-25474: test sizeInBytes for CatalogFileIndex dataSourceTable") {
-    withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> "true") {
-      withTable("t1", "t2") {
-        sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED 
BY (name)")
-        sql("INSERT INTO t1 VALUES (1, 'a')")
-        checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), 
"sizeInBytes=8.0 EiB")
-        sql("CREATE TABLE t2 (id INT, name STRING) USING PARQUET PARTITIONED 
BY (name)")
-        sql("INSERT INTO t2 VALUES (1, 'a')")
-        checkKeywordsExist(sql("EXPLAIN SELECT * FROM t1, t2 WHERE 
t1.id=t2.id"),
-          "BroadcastHashJoin")
-      }
-    }
-  }
-
-  test("SPARK-25474: should not fall back to hdfs when table statistics 
exists" +
-    " for CatalogFileIndex dataSourceTable") {
-
-    var sizeInBytesDisabledFallBack, sizeInBytesEnabledFallBack = 0L
-    Seq(true, false).foreach { fallBackToHdfs =>
-      withSQLConf(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key -> 
fallBackToHdfs.toString) {
-        withTable("t1") {
-          sql("CREATE TABLE t1 (id INT, name STRING) USING PARQUET PARTITIONED 
BY (name)")
-          sql("INSERT INTO t1 VALUES (1, 'a')")
-          // Analyze command updates the statistics of table `t1`
-          sql("ANALYZE TABLE t1 COMPUTE STATISTICS")
-          val catalogTable = getCatalogTable("t1")
-          assert(catalogTable.stats.isDefined)
-
-          if (!fallBackToHdfs) {
-            sizeInBytesDisabledFallBack = 
catalogTable.stats.get.sizeInBytes.toLong
-          } else {
-            sizeInBytesEnabledFallBack = 
catalogTable.stats.get.sizeInBytes.toLong
-          }
-          checkKeywordsNotExist(sql("EXPLAIN COST SELECT * FROM t1"), 
"sizeInBytes=8.0 EiB")
-        }
-      }
-    }
-    assert(sizeInBytesEnabledFallBack === sizeInBytesDisabledFallBack)
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to