This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 1472e66 [SPARK-28716][SQL] Add id to Exchange and Subquery's stringArgs method for easier identifying their reuses in query plans 1472e66 is described below commit 1472e664baf74d60d88d8509cfd7fc3d8c48b2bf Author: Ali Afroozeh <ali.afroo...@databricks.com> AuthorDate: Fri Aug 23 13:29:32 2019 +0200 [SPARK-28716][SQL] Add id to Exchange and Subquery's stringArgs method for easier identifying their reuses in query plans ## What changes were proposed in this pull request? Add id to Exchange and Subquery's stringArgs method for easier identifying their reuses in query plans, for example: ``` ReusedExchange d_date_sk#827, BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) [id=#2710] ``` Where `2710` is the id of the reused exchange. ## How was this patch tested? Passes existing tests Closes #25434 from dbaliafroozeh/ImplementStringArgsExchangeSubqueryExec. Authored-by: Ali Afroozeh <ali.afroo...@databricks.com> Signed-off-by: herman <her...@databricks.com> --- .../src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala | 8 ++++++++ .../org/apache/spark/sql/execution/basicPhysicalOperators.scala | 2 ++ .../scala/org/apache/spark/sql/execution/exchange/Exchange.scala | 2 ++ .../org/apache/spark/sql/execution/debug/DebuggingSuite.scala | 4 ++-- 4 files changed, 14 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 2baf2e5..ba89ba7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} +import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer @@ -46,6 +47,11 @@ object SparkPlan { /** The [[LogicalPlan]] inherited from its ancestor. */ val LOGICAL_PLAN_INHERITED_TAG = TreeNodeTag[LogicalPlan]("logical_plan_inherited") + + private val nextPlanId = new AtomicInteger(0) + + /** Register a new SparkPlan, returning its SparkPlan ID */ + private[execution] def newPlanId(): Int = nextPlanId.getAndIncrement() } /** @@ -64,6 +70,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def sparkContext = sqlContext.sparkContext + val id: Int = SparkPlan.newPlanId() + // sqlContext will be null when SparkPlan nodes are created without the active sessions. val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) { sqlContext.conf.subexpressionEliminationEnabled diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 7204548..b74dd95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -731,6 +731,8 @@ case class SubqueryExec(name: String, child: SparkPlan) override def executeCollect(): Array[InternalRow] = { ThreadUtils.awaitResult(relationFuture, Duration.Inf) } + + override def stringArgs: Iterator[Any] = super.stringArgs ++ Iterator(s"[id=#$id]") } object SubqueryExec { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala index ea0778b..153645d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala @@ -40,6 +40,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch */ abstract class Exchange extends UnaryExecNode { override def output: Seq[Attribute] = child.output + + override def stringArgs: Iterator[Any] = super.stringArgs ++ Iterator(s"[id=#$id]") } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala index aaf1fe4..7a8da7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala @@ -61,8 +61,8 @@ class DebuggingSuite extends SharedSparkSession { } val output = captured.toString() - assert(output.contains( - """== BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])) == + assert(output.replaceAll("\\[id=#\\d+\\]", "[id=#x]").contains( + """== BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#x] == |Tuples output: 0 | id LongType: {} |== WholeStageCodegen == --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org