This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 1785ead [SPARK-36414][SQL] Disable timeout for
BroadcastQueryStageExec in AQE
1785ead is described below
commit 1785ead733d3c8bd6511f2dff231b0c5de53e270
Author: Kent Yao <[email protected]>
AuthorDate: Thu Aug 5 21:15:35 2021 +0800
[SPARK-36414][SQL] Disable timeout for BroadcastQueryStageExec in AQE
### What changes were proposed in this pull request?
This reverts SPARK-31475, as there are always more concurrent jobs running
in AQE mode, especially when running multiple queries at the same time.
Currently, the broadcast timeout does not record accurately for the
BroadcastQueryStageExec only, but also including the time waiting for being
scheduled. If all the resources are currently being occupied for materializing
other stages, it timeouts without a chance to run actually.

The default value is 300s, and it's hard to adjust the timeout for AQE
mode. Usually, you need an extremely large number for real-world cases. As you
can see in the example, above, the timeout we used for it was 1800s, and
obviously, it needed 3x more or something
### Why are the changes needed?
AQE is default now, we can make it more stable with this PR
### Does this PR introduce _any_ user-facing change?
yes, broadcast timeout now is not used for AQE
### How was this patch tested?
modified test
Closes #33636 from yaooqinn/SPARK-36414.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 0c94e47aecab0a8c346e1a004686d1496a9f2b07)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/execution/adaptive/QueryStageExec.scala | 26 ++--------------------
.../sql/execution/joins/BroadcastJoinSuite.scala | 10 +++++----
2 files changed, 8 insertions(+), 28 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index f308829..e2f763e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -17,10 +17,9 @@
package org.apache.spark.sql.execution.adaptive
-import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
-import scala.concurrent.{Future, Promise}
+import scala.concurrent.Future
import org.apache.spark.{FutureAction, MapOutputStatistics}
import org.apache.spark.broadcast.Broadcast
@@ -29,11 +28,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.exchange._
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.ThreadUtils
/**
* A query stage is an independent subgraph of the query plan. Query stage
materializes its output
@@ -221,22 +218,8 @@ case class BroadcastQueryStageExec(
throw new IllegalStateException(s"wrong plan for broadcast stage:\n
${plan.treeString}")
}
- @transient private lazy val materializeWithTimeout = {
- val broadcastFuture = broadcast.submitBroadcastJob
- val timeout = conf.broadcastTimeout
- val promise = Promise[Any]()
- val fail = BroadcastQueryStageExec.scheduledExecutor.schedule(new
Runnable() {
- override def run(): Unit = {
-
promise.tryFailure(QueryExecutionErrors.executeBroadcastTimeoutError(timeout,
None))
- }
- }, timeout, TimeUnit.SECONDS)
- broadcastFuture.onComplete(_ =>
fail.cancel(false))(AdaptiveSparkPlanExec.executionContext)
- Future.firstCompletedOf(
- Seq(broadcastFuture,
promise.future))(AdaptiveSparkPlanExec.executionContext)
- }
-
override def doMaterialize(): Future[Any] = {
- materializeWithTimeout
+ broadcast.submitBroadcastJob
}
override def newReuseInstance(newStageId: Int, newOutput: Seq[Attribute]):
QueryStageExec = {
@@ -257,8 +240,3 @@ case class BroadcastQueryStageExec(
override def getRuntimeStatistics: Statistics = broadcast.runtimeStatistics
}
-
-object BroadcastQueryStageExec {
- private val scheduledExecutor =
- ThreadUtils.newDaemonSingleThreadScheduledExecutor("BroadcastStageTimeout")
-}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 83163cf..dd6a412 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -415,15 +415,17 @@ abstract class BroadcastJoinSuiteBase extends QueryTest
with SQLTestUtils
test("Broadcast timeout") {
val timeout = 5
- val slowUDF = udf({ x: Int => Thread.sleep(timeout * 10 * 1000); x })
+ val slowUDF = udf({ x: Int => Thread.sleep(timeout * 1000); x })
val df1 = spark.range(10).select($"id" as 'a)
val df2 = spark.range(5).select(slowUDF($"id") as 'a)
val testDf = df1.join(broadcast(df2), "a")
withSQLConf(SQLConf.BROADCAST_TIMEOUT.key -> timeout.toString) {
- val e = intercept[Exception] {
- testDf.collect()
+ if (!conf.adaptiveExecutionEnabled) {
+ val e = intercept[Exception] {
+ testDf.collect()
+ }
+ assert(e.getMessage.contains(s"Could not execute broadcast in $timeout
secs."))
}
- assert(e.getMessage.contains(s"Could not execute broadcast in $timeout
secs."))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]