Repository: spark Updated Branches: refs/heads/master a66c23e13 -> fa66ef6c9
[SPARK-4269][SQL] make wait time configurable in BroadcastHashJoin In BroadcastHashJoin, currently it is using a hard coded value (5 minutes) to wait for the execution and broadcast of the small table. In my opinion, it should be a configurable value since broadcast may exceed 5 minutes in some case, like in a busy/congested network environment. Author: Jacky Li <jacky.li...@huawei.com> Closes #3133 from jackylk/timeout-config and squashes the following commits: 733ac08 [Jacky Li] add spark.sql.broadcastTimeout in SQLConf.scala 557acd4 [Jacky Li] switch to sqlContext.getConf 81a5e20 [Jacky Li] make wait time configurable in BroadcastHashJoin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa66ef6c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa66ef6c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa66ef6c Branch: refs/heads/master Commit: fa66ef6c97e87c9255b67b03836a4ba50598ebae Parents: a66c23e Author: Jacky Li <jacky.li...@huawei.com> Authored: Tue Dec 16 15:34:59 2014 -0800 Committer: Michael Armbrust <mich...@databricks.com> Committed: Tue Dec 16 15:34:59 2014 -0800 ---------------------------------------------------------------------- .../src/main/scala/org/apache/spark/sql/SQLConf.scala | 7 +++++++ .../spark/sql/execution/joins/BroadcastHashJoin.scala | 11 ++++++++++- 2 files changed, 17 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fa66ef6c/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 9697beb..f5abf71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -38,6 +38,7 @@ private[spark] object SQLConf { val PARQUET_FILTER_PUSHDOWN_ENABLED = "spark.sql.parquet.filterPushdown" val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord" + val BROADCAST_TIMEOUT = "spark.sql.broadcastTimeout" // Options that control which operators can be chosen by the query planner. These should be // considered hints and may be ignored by future versions of Spark SQL. @@ -148,6 +149,12 @@ private[sql] trait SQLConf { private[spark] def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD, "_corrupt_record") + /** + * Timeout in seconds for the broadcast wait time in hash join + */ + private[spark] def broadcastTimeout: Int = + getConf(BROADCAST_TIMEOUT, (5 * 60).toString).toInt + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ http://git-wip-us.apache.org/repos/asf/spark/blob/fa66ef6c/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index 5cf2a78..fbe1d06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -42,6 +42,15 @@ case class BroadcastHashJoin( right: SparkPlan) extends BinaryNode with HashJoin { + val timeout = { + val timeoutValue = sqlContext.broadcastTimeout + if (timeoutValue < 0) { + Duration.Inf + } else { + timeoutValue.seconds + } + } + override def outputPartitioning: Partitioning = streamedPlan.outputPartitioning override def requiredChildDistribution = @@ -56,7 +65,7 @@ case class BroadcastHashJoin( } override def execute() = { - val broadcastRelation = Await.result(broadcastFuture, 5.minute) + val broadcastRelation = Await.result(broadcastFuture, timeout) streamedPlan.execute().mapPartitions { streamedIter => hashJoin(streamedIter, broadcastRelation.value) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org