Repository: spark
Updated Branches:
  refs/heads/master bd4a21b48 -> ba891ec99


[SPARK-22790][SQL] add a configurable factor to describe HadoopFsRelation's size

## What changes were proposed in this pull request?

as per discussion in 
https://github.com/apache/spark/pull/19864#discussion_r156847927

the current HadoopFsRelation is purely based on the underlying file size which 
is not accurate and makes the execution vulnerable to errors like OOM

Users can enable CBO with the functionalities in 
https://github.com/apache/spark/pull/19864 to avoid this issue

This JIRA proposes to add a configurable factor to sizeInBytes method in 
HadoopFsRelation class so that users can mitigate this problem without CBO

## How was this patch tested?

Existing tests

Author: CodingCat <zhunans...@gmail.com>
Author: Nan Zhu <nan...@uber.com>

Closes #20072 from CodingCat/SPARK-22790.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba891ec9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba891ec9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba891ec9

Branch: refs/heads/master
Commit: ba891ec993c616dc4249fc786c56ea82ed04a827
Parents: bd4a21b
Author: CodingCat <zhunans...@gmail.com>
Authored: Sun Jan 14 02:36:32 2018 +0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Sun Jan 14 02:36:32 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/internal/SQLConf.scala | 13 ++++++-
 .../datasources/HadoopFsRelation.scala          |  6 ++-
 .../datasources/HadoopFsRelationSuite.scala     | 41 ++++++++++++++++++++
 3 files changed, 58 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ba891ec9/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 36e802a..6746fbc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -249,7 +249,7 @@ object SQLConf {
   val CONSTRAINT_PROPAGATION_ENABLED = 
buildConf("spark.sql.constraintPropagation.enabled")
     .internal()
     .doc("When true, the query optimizer will infer and propagate data 
constraints in the query " +
-      "plan to optimize them. Constraint propagation can sometimes be 
computationally expensive" +
+      "plan to optimize them. Constraint propagation can sometimes be 
computationally expensive " +
       "for certain kinds of query plans (such as those with a large number of 
predicates and " +
       "aliases) which might negatively impact overall runtime.")
     .booleanConf
@@ -263,6 +263,15 @@ object SQLConf {
     .booleanConf
     .createWithDefault(false)
 
+  val FILE_COMRESSION_FACTOR = 
buildConf("spark.sql.sources.fileCompressionFactor")
+    .internal()
+    .doc("When estimating the output data size of a table scan, multiply the 
file size with this " +
+      "factor as the estimated data size, in case the data is compressed in 
the file and lead to" +
+      " a heavily underestimated result.")
+    .doubleConf
+    .checkValue(_ > 0, "the value of fileDataSizeFactor must be larger than 0")
+    .createWithDefault(1.0)
+
   val PARQUET_SCHEMA_MERGING_ENABLED = 
buildConf("spark.sql.parquet.mergeSchema")
     .doc("When true, the Parquet data source merges schemas collected from all 
data files, " +
          "otherwise the schema is picked from the summary file or a random 
data file " +
@@ -1255,6 +1264,8 @@ class SQLConf extends Serializable with Logging {
 
   def escapedStringLiterals: Boolean = getConf(ESCAPED_STRING_LITERALS)
 
+  def fileCompressionFactor: Double = getConf(FILE_COMRESSION_FACTOR)
+
   def stringRedationPattern: Option[Regex] = 
SQL_STRING_REDACTION_PATTERN.readFrom(reader)
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/ba891ec9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index 89d8a85..6b34638 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -82,7 +82,11 @@ case class HadoopFsRelation(
     }
   }
 
-  override def sizeInBytes: Long = location.sizeInBytes
+  override def sizeInBytes: Long = {
+    val compressionFactor = sqlContext.conf.fileCompressionFactor
+    (location.sizeInBytes * compressionFactor).toLong
+  }
+
 
   override def inputFiles: Array[String] = location.inputFiles
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ba891ec9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
index caf0388..c1f2c18 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelationSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources
 import java.io.{File, FilenameFilter}
 
 import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
SortMergeJoinExec}
 import org.apache.spark.sql.test.SharedSQLContext
 
 class HadoopFsRelationSuite extends QueryTest with SharedSQLContext {
@@ -39,4 +40,44 @@ class HadoopFsRelationSuite extends QueryTest with 
SharedSQLContext {
       assert(df.queryExecution.logical.stats.sizeInBytes === BigInt(totalSize))
     }
   }
+
+  test("SPARK-22790: spark.sql.sources.compressionFactor takes effect") {
+    import testImplicits._
+    Seq(1.0, 0.5).foreach { compressionFactor =>
+      withSQLConf("spark.sql.sources.fileCompressionFactor" -> 
compressionFactor.toString,
+        "spark.sql.autoBroadcastJoinThreshold" -> "400") {
+        withTempPath { workDir =>
+          // the file size is 740 bytes
+          val workDirPath = workDir.getAbsolutePath
+          val data1 = Seq(100, 200, 300, 400).toDF("count")
+          data1.write.parquet(workDirPath + "/data1")
+          val df1FromFile = spark.read.parquet(workDirPath + "/data1")
+          val data2 = Seq(100, 200, 300, 400).toDF("count")
+          data2.write.parquet(workDirPath + "/data2")
+          val df2FromFile = spark.read.parquet(workDirPath + "/data2")
+          val joinedDF = df1FromFile.join(df2FromFile, Seq("count"))
+          if (compressionFactor == 0.5) {
+            val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
+              case bJoin: BroadcastHashJoinExec => bJoin
+            }
+            assert(bJoinExec.nonEmpty)
+            val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
+              case smJoin: SortMergeJoinExec => smJoin
+            }
+            assert(smJoinExec.isEmpty)
+          } else {
+            // compressionFactor is 1.0
+            val bJoinExec = joinedDF.queryExecution.executedPlan.collect {
+              case bJoin: BroadcastHashJoinExec => bJoin
+            }
+            assert(bJoinExec.isEmpty)
+            val smJoinExec = joinedDF.queryExecution.executedPlan.collect {
+              case smJoin: SortMergeJoinExec => smJoin
+            }
+            assert(smJoinExec.nonEmpty)
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to