This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e6a3385e27fa [SPARK-47044][SQL] Add executed query for JDBC external 
datasources to explain output
e6a3385e27fa is described below

commit e6a3385e27fa95391433ea02fa053540fe101d40
Author: Uros Stankovic <uros.stanko...@databricks.com>
AuthorDate: Tue Feb 20 22:03:28 2024 +0800

    [SPARK-47044][SQL] Add executed query for JDBC external datasources to 
explain output
    
    ### What changes were proposed in this pull request?
    Add generated JDBC query to EXPLAIN FORMATTED command when physical Scan 
node should access to JDBC source to create RDD.
    
    Output of Explain formatted with this change from newly added test.
    ```
    == Physical Plan ==
    * Project (2)
    +- * Scan 
org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$14349389d  (1)
    
    (1) Scan 
org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$14349389d  
[codegen id : 1]
    Output [1]: [MAX(ID)#x]
    Arguments: [MAX(ID)#x], [StructField(MAX(ID),IntegerType,true)], 
PushedDownOperators(Some(org.apache.spark.sql.connector.expressions.aggregate.Aggregation647d3279),None,None,None,List(),ArraySeq(ID
 IS NOT NULL, ID > 1)), JDBCRDD[0] at $anonfun$executePhase$2 at 
LexicalThreadLocal.scala:63, 
org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCScan$$anon$14349389d, 
Statistics(sizeInBytes=8.0 EiB, ColumnStat: N/A)
    External engine query: SELECT MAX("ID") FROM "test"."people"  WHERE ("ID" 
IS NOT NULL) AND ("ID" > 1)
    
    (2) Project [codegen id : 1]
    Output [1]: [MAX(ID)#x AS max(id)#x]
    Input [1]: [MAX(ID)#x]
    ```
    
    ### Why are the changes needed?
    This command will allow customers to see which query text is sent to 
external JDBC sources.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    Customer will have another field in EXPLAIN FORMATTED command for 
RowDataSourceScanExec node.
    
    ### How was this patch tested?
    Tested using JDBC V2 suite by new unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #45102 from urosstan-db/add-sql-query-for-external-datasources.
    
    Authored-by: Uros Stankovic <uros.stanko...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/catalyst/trees/TreeNode.scala |  8 ++--
 .../spark/sql/execution/DataSourceScanExec.scala   | 10 ++++
 .../datasources/ExternalEngineDatasourceRDD.scala  | 26 ++++++++++
 .../sql/execution/datasources/jdbc/JDBCRDD.scala   | 56 ++++++++++++----------
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala    |  7 +++
 5 files changed, 78 insertions(+), 29 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index dbacb833ef59..10e2718da833 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -1000,12 +1000,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
 
     val str = if (verbose) {
       if (addSuffix) verboseStringWithSuffix(maxFields) else 
verboseString(maxFields)
+    } else if (printNodeId) {
+      simpleStringWithNodeId()
     } else {
-      if (printNodeId) {
-        simpleStringWithNodeId()
-      } else {
-        simpleString(maxFields)
-      }
+      simpleString(maxFields)
     }
     append(prefix)
     append(str)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index ec265f4eaea4..474d65a251ba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -127,6 +127,16 @@ case class RowDataSourceScanExec(
     }
   }
 
+  override def verboseStringWithOperatorId(): String = {
+    super.verboseStringWithOperatorId() + (rdd match {
+      case externalEngineDatasourceRdd: ExternalEngineDatasourceRDD =>
+        "External engine query: " +
+          externalEngineDatasourceRdd.getExternalEngineQuery +
+          System.lineSeparator()
+      case _ => ""
+    })
+  }
+
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ExternalEngineDatasourceRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ExternalEngineDatasourceRDD.scala
new file mode 100644
index 000000000000..14ca824596f9
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ExternalEngineDatasourceRDD.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+/**
+ * Represents a trait that should be implemented by relations which
+ * access external database engines
+ */
+trait ExternalEngineDatasourceRDD {
+  def getExternalEngineQuery: String
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
index a436627fd117..395d4a339d90 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala
@@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.expressions.filter.Predicate
-import org.apache.spark.sql.execution.datasources.DataSourceMetricsMixin
+import org.apache.spark.sql.execution.datasources.{DataSourceMetricsMixin, 
ExternalEngineDatasourceRDD}
 import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.jdbc.{JdbcDialect, JdbcDialects}
@@ -173,7 +173,7 @@ class JDBCRDD(
     limit: Int,
     sortOrders: Array[String],
     offset: Int)
-  extends RDD[InternalRow](sc, Nil) with DataSourceMetricsMixin {
+  extends RDD[InternalRow](sc, Nil) with DataSourceMetricsMixin with 
ExternalEngineDatasourceRDD {
 
   /**
    * Execution time of the query issued to JDBC connection
@@ -182,11 +182,40 @@ class JDBCRDD(
     sparkContext,
     name = "JDBC query execution time")
 
+  private lazy val dialect = JdbcDialects.get(url)
+
+  def generateJdbcQuery(partition: Option[JDBCPartition]): String = {
+    // H2's JDBC driver does not support the setSchema() method.  We pass a
+    // fully-qualified table name in the SELECT statement.  I don't know how to
+    // talk about a table in a completely portable way.
+    var builder = dialect
+      .getJdbcSQLQueryBuilder(options)
+      .withPredicates(predicates, 
partition.getOrElse(JDBCPartition(whereClause = null, idx = 1)))
+      .withColumns(columns)
+      .withSortOrders(sortOrders)
+      .withLimit(limit)
+      .withOffset(offset)
+
+    groupByColumns.foreach { groupByKeys =>
+      builder = builder.withGroupByColumns(groupByKeys)
+    }
+
+    sample.foreach { tableSampleInfo =>
+      builder = builder.withTableSample(tableSampleInfo)
+    }
+
+    builder.build()
+  }
+
   /**
    * Retrieve the list of partitions corresponding to this RDD.
    */
   override def getPartitions: Array[Partition] = partitions
 
+  override def getExternalEngineQuery: String = {
+    generateJdbcQuery(partition = None)
+  }
+
   /**
    * Runs the SQL query against the JDBC driver.
    *
@@ -236,7 +265,6 @@ class JDBCRDD(
     val inputMetrics = context.taskMetrics().inputMetrics
     val part = thePart.asInstanceOf[JDBCPartition]
     conn = getConnection(part.idx)
-    val dialect = JdbcDialects.get(url)
     import scala.jdk.CollectionConverters._
     dialect.beforeFetch(conn, options.asProperties.asScala.toMap)
 
@@ -256,27 +284,7 @@ class JDBCRDD(
       case None =>
     }
 
-    // H2's JDBC driver does not support the setSchema() method.  We pass a
-    // fully-qualified table name in the SELECT statement.  I don't know how to
-    // talk about a table in a completely portable way.
-
-    var builder = dialect
-      .getJdbcSQLQueryBuilder(options)
-      .withColumns(columns)
-      .withPredicates(predicates, part)
-      .withSortOrders(sortOrders)
-      .withLimit(limit)
-      .withOffset(offset)
-
-    groupByColumns.foreach { groupByKeys =>
-      builder = builder.withGroupByColumns(groupByKeys)
-    }
-
-    sample.foreach { tableSampleInfo =>
-      builder = builder.withTableSample(tableSampleInfo)
-    }
-
-    val sqlText = builder.build()
+    val sqlText = generateJdbcQuery(Some(part))
     stmt = conn.prepareStatement(sqlText,
         ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
     stmt.setFetchSize(options.fetchSize)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index c258da07cac9..7bae2d77a161 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.connector.catalog.{Catalogs, 
Identifier, TableCatalo
 import org.apache.spark.sql.connector.catalog.functions.{ScalarFunction, 
UnboundFunction}
 import org.apache.spark.sql.connector.catalog.index.SupportsIndex
 import org.apache.spark.sql.connector.expressions.Expression
+import org.apache.spark.sql.execution.FormattedMode
 import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, 
V1ScanWrapper}
 import org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog
 import org.apache.spark.sql.functions.{abs, acos, asin, atan, atan2, avg, 
ceil, coalesce, cos, cosh, cot, count, count_distinct, degrees, exp, floor, 
lit, log => logarithm, log10, not, pow, radians, round, signum, sin, sinh, 
sqrt, sum, tan, tanh, udf, when}
@@ -3021,4 +3022,10 @@ class JDBCV2Suite extends QueryTest with 
SharedSparkSession with ExplainSuiteHel
       JdbcDialects.registerDialect(H2Dialect)
     }
   }
+
+  test("Explain shows executed SQL query") {
+    val df = sql("SELECT max(id) FROM h2.test.people WHERE id > 1")
+    val explained = getNormalizedExplain(df, FormattedMode)
+    assert(explained.contains("External engine query:"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to