Repository: spark
Updated Branches:
  refs/heads/master 044b33b2e -> 79c668942


[SPARK-24757][SQL] Improving the error message for broadcast timeouts

## What changes were proposed in this pull request?

In the PR, I propose to provide a tip to user how to resolve the issue of 
timeout expiration for broadcast joins. In particular, they can increase the 
timeout via **spark.sql.broadcastTimeout** or disable the broadcast at all by 
setting **spark.sql.autoBroadcastJoinThreshold** to `-1`.

## How was this patch tested?

It tested manually from `spark-shell`:
```
scala> spark.conf.set("spark.sql.broadcastTimeout", 1)
scala> val df = spark.range(100).join(spark.range(15).as[Long].map { x =>
               Thread.sleep(5000)
               x
            }).where("id = value")
scala> df.count()
```
```
org.apache.spark.SparkException: Could not execute broadcast in 1 secs. You can 
increase the timeout for broadcasts via spark.sql.broadcastTimeout or disable 
broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
  at 
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:150)
```

Author: Maxim Gekk <maxim.g...@databricks.com>

Closes #21727 from MaxGekk/broadcast-timeout-error.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79c66894
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79c66894
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79c66894

Branch: refs/heads/master
Commit: 79c66894296840cc4a5bf6c8718ecfd2b08bcca8
Parents: 044b33b
Author: Maxim Gekk <maxim.g...@databricks.com>
Authored: Sat Jul 7 22:16:48 2018 +0200
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Sat Jul 7 22:16:48 2018 +0200

----------------------------------------------------------------------
 .../sql/execution/exchange/BroadcastExchangeExec.scala | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/79c66894/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index c55f9b8..a80673c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.exchange
 
+import java.util.concurrent.TimeoutException
+
 import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
 import scala.util.control.NonFatal
@@ -140,7 +142,16 @@ case class BroadcastExchangeExec(
   }
 
   override protected[sql] def doExecuteBroadcast[T](): broadcast.Broadcast[T] 
= {
-    ThreadUtils.awaitResult(relationFuture, 
timeout).asInstanceOf[broadcast.Broadcast[T]]
+    try {
+      ThreadUtils.awaitResult(relationFuture, 
timeout).asInstanceOf[broadcast.Broadcast[T]]
+    } catch {
+      case ex: TimeoutException =>
+        logError(s"Could not execute broadcast in ${timeout.toSeconds} secs.", 
ex)
+        throw new SparkException(s"Could not execute broadcast in 
${timeout.toSeconds} secs. " +
+          s"You can increase the timeout for broadcasts via 
${SQLConf.BROADCAST_TIMEOUT.key} or " +
+          s"disable broadcast join by setting 
${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1",
+          ex)
+    }
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to