Repository: spark Updated Branches: refs/heads/branch-2.2 b7a2a16b1 -> 2405afce4
[SPARK-20872][SQL] ShuffleExchange.nodeName should handle null coordinator ## What changes were proposed in this pull request? A one-liner change in `ShuffleExchange.nodeName` to cover the case when `coordinator` is `null`, so that the match expression is exhaustive. Please refer to [SPARK-20872](https://issues.apache.org/jira/browse/SPARK-20872) for a description of the symptoms. TL;DR is that inspecting a `ShuffleExchange` (directly or transitively) on the Executor side can hit a case where the `coordinator` field of a `ShuffleExchange` is null, and thus will trigger a `MatchError` in `ShuffleExchange.nodeName()`'s inexhaustive match expression. Also changed two other match conditions in `ShuffleExchange` on the `coordinator` field to be consistent. ## How was this patch tested? Manually tested this change with a case where the `coordinator` is null to make sure `ShuffleExchange.nodeName` doesn't throw a `MatchError` any more. Author: Kris Mok <[email protected]> Closes #18095 from rednaxelafx/shuffleexchange-nodename. (cherry picked from commit c0b3e45e3b46a5235b748cb85ad200c9ec1bb426) Signed-off-by: Xiao Li <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2405afce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2405afce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2405afce Branch: refs/heads/branch-2.2 Commit: 2405afce4e87c0486f2aef1d068f17aea2480b17 Parents: b7a2a16 Author: Kris Mok <[email protected]> Authored: Wed May 24 17:19:35 2017 -0700 Committer: Xiao Li <[email protected]> Committed: Wed May 24 17:19:46 2017 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/exchange/ShuffleExchange.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2405afce/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala index f06544e..eebe6ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala @@ -40,6 +40,9 @@ case class ShuffleExchange( child: SparkPlan, @transient coordinator: Option[ExchangeCoordinator]) extends Exchange { + // NOTE: coordinator can be null after serialization/deserialization, + // e.g. it can be null on the Executor side + override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")) @@ -47,7 +50,7 @@ case class ShuffleExchange( val extraInfo = coordinator match { case Some(exchangeCoordinator) => s"(coordinator id: ${System.identityHashCode(exchangeCoordinator)})" - case None => "" + case _ => "" } val simpleNodeName = "Exchange" @@ -70,7 +73,7 @@ case class ShuffleExchange( // the plan. coordinator match { case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this) - case None => + case _ => } } @@ -117,7 +120,7 @@ case class ShuffleExchange( val shuffleRDD = exchangeCoordinator.postShuffleRDD(this) assert(shuffleRDD.partitions.length == newPartitioning.numPartitions) shuffleRDD - case None => + case _ => val shuffleDependency = prepareShuffleDependency() preparePostShuffleRDD(shuffleDependency) } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
