Repository: spark
Updated Branches:
  refs/heads/master 07737c87d -> 4a9c9d8f9


[SPARK-25159][SQL] json schema inference should only trigger one job

## What changes were proposed in this pull request?

This fixes a perf regression caused by 
https://github.com/apache/spark/pull/21376 .

We should not use `RDD#toLocalIterator`, which triggers one Spark job per RDD 
partition. This is very bad for RDDs with a lot of small partitions.

To fix it, this PR introduces a way to access SQLConf in the scheduler event 
loop thread, so that we don't need to use `RDD#toLocalIterator` anymore in 
`JsonInferSchema`.

## How was this patch tested?

a new test

Closes #22152 from cloud-fan/conf.

Authored-by: Wenchen Fan <wenc...@databricks.com>
Signed-off-by: Xiao Li <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a9c9d8f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a9c9d8f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a9c9d8f

Branch: refs/heads/master
Commit: 4a9c9d8f9a8f8f165369e121d3b553a3515333d4
Parents: 07737c8
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Tue Aug 21 22:21:08 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Tue Aug 21 22:21:08 2018 -0700

----------------------------------------------------------------------
 .../sql/catalyst/json/JsonInferSchema.scala     | 16 +++++++---
 .../org/apache/spark/sql/internal/SQLConf.scala | 33 ++++++++++++++++----
 .../org/apache/spark/sql/DataFrameSuite.scala   | 24 ++++++++++++++
 3 files changed, 63 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4a9c9d8f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 5f70e06..9999a00 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
 import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
 import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, 
ParseMode, PermissiveMode}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -69,10 +70,17 @@ private[sql] object JsonInferSchema {
       }.reduceOption(typeMerger).toIterator
     }
 
-    // Here we get RDD local iterator then fold, instead of calling `RDD.fold` 
directly, because
-    // `RDD.fold` will run the fold function in DAGScheduler event loop 
thread, which may not have
-    // active SparkSession and `SQLConf.get` may point to the wrong configs.
-    val rootType = 
mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
+    // Here we manually submit a fold-like Spark job, so that we can set the 
SQLConf when running
+    // the fold functions in the scheduler event loop thread.
+    val existingConf = SQLConf.get
+    var rootType: DataType = StructType(Nil)
+    val foldPartition = (iter: Iterator[DataType]) => 
iter.fold(StructType(Nil))(typeMerger)
+    val mergeResult = (index: Int, taskResult: DataType) => {
+      rootType = SQLConf.withExistingConf(existingConf) {
+        typeMerger(rootType, taskResult)
+      }
+    }
+    json.sparkContext.runJob(mergedTypesFromPartitions, foldPartition, 
mergeResult)
 
     canonicalizeType(rootType, configOptions) match {
       case Some(st: StructType) => st

http://git-wip-us.apache.org/repos/asf/spark/blob/4a9c9d8f/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5913c94..df2caff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -82,6 +82,19 @@ object SQLConf {
   /** See [[get]] for more information. */
   def getFallbackConf: SQLConf = fallbackConf.get()
 
+  private lazy val existingConf = new ThreadLocal[SQLConf] {
+    override def initialValue: SQLConf = null
+  }
+
+  def withExistingConf[T](conf: SQLConf)(f: => T): T = {
+    existingConf.set(conf)
+    try {
+      f
+    } finally {
+      existingConf.remove()
+    }
+  }
+
   /**
    * Defines a getter that returns the SQLConf within scope.
    * See [[get]] for more information.
@@ -116,16 +129,24 @@ object SQLConf {
     if (TaskContext.get != null) {
       new ReadOnlySQLConf(TaskContext.get())
     } else {
-      if (Utils.isTesting && SparkContext.getActive.isDefined) {
+      val isSchedulerEventLoopThread = SparkContext.getActive
+        .map(_.dagScheduler.eventProcessLoop.eventThread)
+        .exists(_.getId == Thread.currentThread().getId)
+      if (isSchedulerEventLoopThread) {
         // DAGScheduler event loop thread does not have an active 
SparkSession, the `confGetter`
-        // will return `fallbackConf` which is unexpected. Here we prevent it 
from happening.
-        val schedulerEventLoopThread =
-          SparkContext.getActive.get.dagScheduler.eventProcessLoop.eventThread
-        if (schedulerEventLoopThread.getId == Thread.currentThread().getId) {
+        // will return `fallbackConf` which is unexpected. Here we require the 
caller to get the
+        // conf within `withExistingConf`, otherwise fail the query.
+        val conf = existingConf.get()
+        if (conf != null) {
+          conf
+        } else if (Utils.isTesting) {
           throw new RuntimeException("Cannot get SQLConf inside scheduler 
event loop thread.")
+        } else {
+          confGetter.get()()
         }
+      } else {
+        confGetter.get()()
       }
-      confGetter.get()()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4a9c9d8f/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index b0e22a5..7310087 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -27,6 +27,7 @@ import scala.util.Random
 import org.scalatest.Matchers._
 
 import org.apache.spark.SparkException
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Uuid
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, OneRowRelation, 
Union}
@@ -2528,4 +2529,27 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
       checkAnswer(aggPlusFilter1, aggPlusFilter2.collect())
     }
   }
+
+  test("SPARK-25159: json schema inference should only trigger one job") {
+    withTempPath { path =>
+      // This test is to prove that the `JsonInferSchema` does not use 
`RDD#toLocalIterator` which
+      // triggers one Spark job per RDD partition.
+      Seq(1 -> "a", 2 -> "b").toDF("i", "p")
+        // The data set has 2 partitions, so Spark will write at least 2 json 
files.
+        // Use a non-splittable compression (gzip), to make sure the json scan 
RDD has at least 2
+        // partitions.
+        .write.partitionBy("p").option("compression", 
"gzip").json(path.getCanonicalPath)
+
+      var numJobs = 0
+      sparkContext.addSparkListener(new SparkListener {
+        override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
+          numJobs += 1
+        }
+      })
+
+      val df = spark.read.json(path.getCanonicalPath)
+      assert(df.columns === Array("i", "p"))
+      assert(numJobs == 1)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to