This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d6b33a  [SPARK-28939][SQL] Propagate SQLConf for plans executed by 
toRdd
3d6b33a is described below

commit 3d6b33a49a8daba17973994169ee4a9e2507a6d9
Author: Marco Gaido <marcogaid...@gmail.com>
AuthorDate: Mon Sep 9 21:20:34 2019 +0800

    [SPARK-28939][SQL] Propagate SQLConf for plans executed by toRdd
    
    ### What changes were proposed in this pull request?
    
    The PR proposes to create a custom `RDD` which enables to propagate 
`SQLConf` also in cases not tracked by SQL execution, as it happens when a 
`Dataset` is converted to and RDD either using `.rdd` or 
`.queryExecution.toRdd` and then the returned RDD is used to invoke actions on 
it.
    
    In this way, SQL configs are effective also in these cases, while earlier 
they were ignored.
    
    ### Why are the changes needed?
    
    Without this patch, all the times `.rdd` or `.queryExecution.toRdd` are 
used, all the SQL configs set are ignored. An example of a reproducer can be:
    ```
      withSQLConf(SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key, "false") {
        val df = spark.range(2).selectExpr((0 to 5000).map(i => s"id as 
field_$i"): _*)
        df.createOrReplaceTempView("spark64kb")
        val data = spark.sql("select * from spark64kb limit 10")
        // Subexpression elimination is used here, despite it should have been 
disabled
        data.describe()
      }
    ```
    
    ### Does this PR introduce any user-facing change?
    
    When a user calls `.queryExecution.toRdd`, a `SQLExecutionRDD` is returned 
wrapping the `RDD` of the execute. When `.rdd` is used, an additional 
`SQLExecutionRDD` is present in the hierarchy.
    
    ### How was this patch tested?
    
    added UT
    
    Closes #25643 from mgaido91/SPARK-28939.
    
    Authored-by: Marco Gaido <marcogaid...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 11 +++-
 .../spark/sql/execution/QueryExecution.scala       |  3 +-
 .../spark/sql/execution/SQLExecutionRDD.scala      | 64 ++++++++++++++++++++++
 .../sql/internal/ExecutorSideSQLConfSuite.scala    | 46 +++++++++++++++-
 4 files changed, 119 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6c6cca8..d9b0a72 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -115,7 +115,9 @@ object SQLConf {
    * Returns the active config object within the current scope. If there is an 
active SparkSession,
    * the proper SQLConf associated with the thread's active session is used. 
If it's called from
    * tasks in the executor side, a SQLConf will be created from job local 
properties, which are set
-   * and propagated from the driver side.
+   * and propagated from the driver side, unless a `SQLConf` has been set in 
the scope by
+   * `withExistingConf` as done for propagating SQLConf for operations 
performed on RDDs created
+   * from DataFrames.
    *
    * The way this works is a little bit convoluted, due to the fact that 
config was added initially
    * only for physical plans (and as a result not in sql/catalyst module).
@@ -129,7 +131,12 @@ object SQLConf {
    */
   def get: SQLConf = {
     if (TaskContext.get != null) {
-      new ReadOnlySQLConf(TaskContext.get())
+      val conf = existingConf.get()
+      if (conf != null) {
+        conf
+      } else {
+        new ReadOnlySQLConf(TaskContext.get())
+      }
     } else {
       val isSchedulerEventLoopThread = SparkContext.getActive
         .map(_.dagScheduler.eventProcessLoop.eventThread)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index e5e86db..630d062 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -105,7 +105,8 @@ class QueryExecution(
    * Given QueryExecution is not a public class, end users are discouraged to 
use this: please
    * use `Dataset.rdd` instead where conversion will be applied.
    */
-  lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
+  lazy val toRdd: RDD[InternalRow] = new SQLExecutionRDD(
+    executedPlan.execute(), sparkSession.sessionState.conf)
 
   /**
    * Prepares a planned [[SparkPlan]] for execution by inserting shuffle 
operations and internal
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala
new file mode 100644
index 0000000..7373da3
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecutionRDD.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import java.util.Properties
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * It is just a wrapper over `sqlRDD`, which sets and makes effective all the 
configs from the
+ * captured `SQLConf`.
+ * Please notice that this means we may miss configurations set after the 
creation of this RDD and
+ * before its execution.
+ *
+ * @param sqlRDD the `RDD` generated by the SQL plan
+ * @param conf the `SQLConf` to apply to the execution of the SQL plan
+ */
+class SQLExecutionRDD(
+    var sqlRDD: RDD[InternalRow], @transient conf: SQLConf) extends 
RDD[InternalRow](sqlRDD) {
+  private val sqlConfigs = conf.getAllConfs
+  private lazy val sqlConfExecutorSide = {
+    val props = new Properties()
+    props.putAll(sqlConfigs.asJava)
+    val newConf = new SQLConf()
+    newConf.setConf(props)
+    newConf
+  }
+
+  override val partitioner = firstParent[InternalRow].partitioner
+
+  override def getPartitions: Array[Partition] = 
firstParent[InternalRow].partitions
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
+    // If we are in the context of a tracked SQL operation, 
`SQLExecution.EXECUTION_ID_KEY` is set
+    // and we have nothing to do here. Otherwise, we use the `SQLConf` 
captured at the creation of
+    // this RDD.
+    if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) {
+      SQLConf.withExistingConf(sqlConfExecutorSide) {
+        firstParent[InternalRow].iterator(split, context)
+      }
+    } else {
+      firstParent[InternalRow].iterator(split, context)
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
index d885348..94b73ec 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
@@ -17,8 +17,13 @@
 
 package org.apache.spark.sql.internal
 
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlan}
 import org.apache.spark.sql.execution.debug.codegenStringSeq
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.test.SQLTestUtils
@@ -102,4 +107,41 @@ class ExecutorSideSQLConfSuite extends SparkFunSuite with 
SQLTestUtils {
       }
     }
   }
+
+  test("SPARK-28939: propagate SQLConf also in conversions to RDD") {
+    val confs = Seq("spark.sql.a" -> "x", "spark.sql.b" -> "y")
+    val physicalPlan = SQLConfAssertPlan(confs)
+    val dummyQueryExecution = FakeQueryExecution(spark, physicalPlan)
+    withSQLConf(confs: _*) {
+      // Force RDD evaluation to trigger asserts
+      dummyQueryExecution.toRdd.collect()
+    }
+    val dummyQueryExecution1 = FakeQueryExecution(spark, physicalPlan)
+    // Without setting the configs assertions fail
+    val e = intercept[SparkException](dummyQueryExecution1.toRdd.collect())
+    assert(e.getCause.isInstanceOf[NoSuchElementException])
+  }
+}
+
+case class SQLConfAssertPlan(confToCheck: Seq[(String, String)]) extends 
LeafExecNode {
+  override protected def doExecute(): RDD[InternalRow] = {
+    sqlContext
+      .sparkContext
+      .parallelize(0 until 2, 2)
+      .mapPartitions { it =>
+        val confs = SQLConf.get
+        confToCheck.foreach { case (key, expectedValue) =>
+          assert(confs.getConfString(key) == expectedValue)
+        }
+        it.map(i => InternalRow.fromSeq(Seq(i)))
+      }
+  }
+
+  override def output: Seq[Attribute] = Seq.empty
+}
+
+case class FakeQueryExecution(spark: SparkSession, physicalPlan: SparkPlan)
+    extends QueryExecution(spark, LocalRelation()) {
+  override lazy val sparkPlan: SparkPlan = physicalPlan
+  override lazy val executedPlan: SparkPlan = physicalPlan
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to