Repository: spark
Updated Branches:
refs/heads/master 044b33b2e -> 79c668942
[SPARK-24757][SQL] Improving the error message for broadcast timeouts
## What changes were proposed in this pull request?
In the PR, I propose to provide a tip to user how to resolve the issue of
timeout expiration for broadcast joins. In particular, they can increase the
timeout via **spark.sql.broadcastTimeout** or disable the broadcast at all by
setting **spark.sql.autoBroadcastJoinThreshold** to `-1`.
## How was this patch tested?
It tested manually from `spark-shell`:
```
scala> spark.conf.set("spark.sql.broadcastTimeout", 1)
scala> val df = spark.range(100).join(spark.range(15).as[Long].map { x =>
Thread.sleep(5000)
x
}).where("id = value")
scala> df.count()
```
```
org.apache.spark.SparkException: Could not execute broadcast in 1 secs. You can
increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable
broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:150)
```
Author: Maxim Gekk <[email protected]>
Closes #21727 from MaxGekk/broadcast-timeout-error.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79c66894
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79c66894
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79c66894
Branch: refs/heads/master
Commit: 79c66894296840cc4a5bf6c8718ecfd2b08bcca8
Parents: 044b33b
Author: Maxim Gekk <[email protected]>
Authored: Sat Jul 7 22:16:48 2018 +0200
Committer: Herman van Hovell <[email protected]>
Committed: Sat Jul 7 22:16:48 2018 +0200
----------------------------------------------------------------------
.../sql/execution/exchange/BroadcastExchangeExec.scala | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/79c66894/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index c55f9b8..a80673c 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.exchange
+import java.util.concurrent.TimeoutException
+
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
import scala.util.control.NonFatal
@@ -140,7 +142,16 @@ case class BroadcastExchangeExec(
}
override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T]
= {
- ThreadUtils.awaitResult(relationFuture,
timeout).asInstanceOf[broadcast.Broadcast[T]]
+ try {
+ ThreadUtils.awaitResult(relationFuture,
timeout).asInstanceOf[broadcast.Broadcast[T]]
+ } catch {
+ case ex: TimeoutException =>
+ logError(s"Could not execute broadcast in ${timeout.toSeconds} secs.",
ex)
+ throw new SparkException(s"Could not execute broadcast in
${timeout.toSeconds} secs. " +
+ s"You can increase the timeout for broadcasts via
${SQLConf.BROADCAST_TIMEOUT.key} or " +
+ s"disable broadcast join by setting
${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1",
+ ex)
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]