This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1472e66  [SPARK-28716][SQL] Add id to Exchange and Subquery's 
stringArgs method for easier identifying their reuses in query plans
1472e66 is described below

commit 1472e664baf74d60d88d8509cfd7fc3d8c48b2bf
Author: Ali Afroozeh <ali.afroo...@databricks.com>
AuthorDate: Fri Aug 23 13:29:32 2019 +0200

    [SPARK-28716][SQL] Add id to Exchange and Subquery's stringArgs method for 
easier identifying their reuses in query plans
    
    ## What changes were proposed in this pull request?
    
    Add id to Exchange and Subquery's stringArgs method for easier identifying 
their reuses in query plans, for example:
    ```
    ReusedExchange d_date_sk#827, BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) 
[id=#2710]
    ```
    Where `2710` is the id of the reused exchange.
    
    ## How was this patch tested?
    
    Passes existing tests
    
    Closes #25434 from dbaliafroozeh/ImplementStringArgsExchangeSubqueryExec.
    
    Authored-by: Ali Afroozeh <ali.afroo...@databricks.com>
    Signed-off-by: herman <her...@databricks.com>
---
 .../src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala | 8 ++++++++
 .../org/apache/spark/sql/execution/basicPhysicalOperators.scala   | 2 ++
 .../scala/org/apache/spark/sql/execution/exchange/Exchange.scala  | 2 ++
 .../org/apache/spark/sql/execution/debug/DebuggingSuite.scala     | 4 ++--
 4 files changed, 14 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 2baf2e5..ba89ba7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, 
DataOutputStream}
+import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -46,6 +47,11 @@ object SparkPlan {
 
   /** The [[LogicalPlan]] inherited from its ancestor. */
   val LOGICAL_PLAN_INHERITED_TAG = 
TreeNodeTag[LogicalPlan]("logical_plan_inherited")
+
+  private val nextPlanId = new AtomicInteger(0)
+
+  /** Register a new SparkPlan, returning its SparkPlan ID */
+  private[execution] def newPlanId(): Int = nextPlanId.getAndIncrement()
 }
 
 /**
@@ -64,6 +70,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
 
   protected def sparkContext = sqlContext.sparkContext
 
+  val id: Int = SparkPlan.newPlanId()
+
   // sqlContext will be null when SparkPlan nodes are created without the 
active sessions.
   val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) {
     sqlContext.conf.subexpressionEliminationEnabled
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 7204548..b74dd95 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -731,6 +731,8 @@ case class SubqueryExec(name: String, child: SparkPlan)
   override def executeCollect(): Array[InternalRow] = {
     ThreadUtils.awaitResult(relationFuture, Duration.Inf)
   }
+
+  override def stringArgs: Iterator[Any] = super.stringArgs ++ 
Iterator(s"[id=#$id]")
 }
 
 object SubqueryExec {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
index ea0778b..153645d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
@@ -40,6 +40,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
  */
 abstract class Exchange extends UnaryExecNode {
   override def output: Seq[Attribute] = child.output
+
+  override def stringArgs: Iterator[Any] = super.stringArgs ++ 
Iterator(s"[id=#$id]")
 }
 
 /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index aaf1fe4..7a8da7e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -61,8 +61,8 @@ class DebuggingSuite extends SharedSparkSession {
     }
 
     val output = captured.toString()
-    assert(output.contains(
-      """== BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
bigint, false])) ==
+    assert(output.replaceAll("\\[id=#\\d+\\]", "[id=#x]").contains(
+      """== BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
bigint, false])), [id=#x] ==
         |Tuples output: 0
         | id LongType: {}
         |== WholeStageCodegen ==


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to