Repository: spark Updated Branches: refs/heads/master 044b33b2e -> 79c668942
[SPARK-24757][SQL] Improving the error message for broadcast timeouts ## What changes were proposed in this pull request? In the PR, I propose to provide a tip to user how to resolve the issue of timeout expiration for broadcast joins. In particular, they can increase the timeout via **spark.sql.broadcastTimeout** or disable the broadcast at all by setting **spark.sql.autoBroadcastJoinThreshold** to `-1`. ## How was this patch tested? It tested manually from `spark-shell`: ``` scala> spark.conf.set("spark.sql.broadcastTimeout", 1) scala> val df = spark.range(100).join(spark.range(15).as[Long].map { x => Thread.sleep(5000) x }).where("id = value") scala> df.count() ``` ``` org.apache.spark.SparkException: Could not execute broadcast in 1 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1 at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:150) ``` Author: Maxim Gekk <maxim.g...@databricks.com> Closes #21727 from MaxGekk/broadcast-timeout-error. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79c66894 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79c66894 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79c66894 Branch: refs/heads/master Commit: 79c66894296840cc4a5bf6c8718ecfd2b08bcca8 Parents: 044b33b Author: Maxim Gekk <maxim.g...@databricks.com> Authored: Sat Jul 7 22:16:48 2018 +0200 Committer: Herman van Hovell <hvanhov...@databricks.com> Committed: Sat Jul 7 22:16:48 2018 +0200 ---------------------------------------------------------------------- .../sql/execution/exchange/BroadcastExchangeExec.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/79c66894/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index c55f9b8..a80673c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.exchange +import java.util.concurrent.TimeoutException + import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.control.NonFatal @@ -140,7 +142,16 @@ case class BroadcastExchangeExec( } override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] = { - ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]] + try { + ThreadUtils.awaitResult(relationFuture, timeout).asInstanceOf[broadcast.Broadcast[T]] + } catch { + case ex: TimeoutException => + logError(s"Could not execute broadcast in ${timeout.toSeconds} secs.", ex) + throw new SparkException(s"Could not execute broadcast in ${timeout.toSeconds} secs. " + + s"You can increase the timeout for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} or " + + s"disable broadcast join by setting ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1", + ex) + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org