[
https://issues.apache.org/jira/browse/SPARK-41129?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17754048#comment-17754048
]
JacobZheng commented on SPARK-41129:
------------------------------------
I'm experiencing the exact same thing, not sure if it has anything to do with
concurrency. Did you find a cause or solution to the problem?
> When multiple SQLs are concurrent, the driver subquery thread is permanently
> locked
> -----------------------------------------------------------------------------------
>
> Key: SPARK-41129
> URL: https://issues.apache.org/jira/browse/SPARK-41129
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.3.0
> Reporter: Roy
> Priority: Major
> Attachments: locksqlPNG.PNG, normaljobs.PNG
>
>
> When a sql has only a small amount of concurrency (10), the sql will generate
> 11 jobs, and can be executed smoothly (Please refer to attached picture).
> But when I increased the number of concurrency to 20, each sql only executed
> the first job and stopped (Please refer to attached picture),
> And look at the driver thread dump and find that the subquery threads (20
> threads) is locked, detail below
>
> {code:java}
> Monitor(org.apache.spark.sql.execution.aggregate.HashAggregateExec@1335537910}),
> Lock(java.util.concurrent.ThreadPoolExecutor$Worker@1502413281}),
> Monitor(org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec@890810300}),
> Monitor(java.lang.Object@603970601}),
> Monitor(org.apache.spark.sql.execution.exchange.ShuffleExchangeExec@2042514973})
> {code}
> {code:java}
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
> java.util.concurrent.FutureTask.get(FutureTask.java:191)
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
> org.apache.spark.sql.execution.SubqueryExec.executeCollect(basicPhysicalOperators.scala:861)
>
> org.apache.spark.sql.execution.ScalarSubquery.updateResult(subquery.scala:80)
> org.apache.spark.sql.execution.SparkPlan.$anonfun$waitForSubqueries$1(SparkPlan.scala:262)
>
> org.apache.spark.sql.execution.SparkPlan.$anonfun$waitForSubqueries$1$adapted(SparkPlan.scala:261)
>
> org.apache.spark.sql.execution.SparkPlan$$Lambda$3650/586819338.apply(Unknown
> Source)
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> org.apache.spark.sql.execution.SparkPlan.waitForSubqueries(SparkPlan.scala:261)
> => holding
> Monitor(org.apache.spark.sql.execution.aggregate.HashAggregateExec@1335537910})
>
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:231)
>
> org.apache.spark.sql.execution.SparkPlan$$Lambda$3645/1297667696.apply(Unknown
> Source)
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
> org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92)
>
> org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92)
>
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:47)
>
> org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:660)
>
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723)
>
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
>
> org.apache.spark.sql.execution.SparkPlan$$Lambda$3644/556844527.apply(Unknown
> Source)
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
>
> org.apache.spark.sql.execution.SparkPlan$$Lambda$3645/1297667696.apply(Unknown
> Source)
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD$lzycompute(ShuffleExchangeExec.scala:135)
> => holding
> Monitor(org.apache.spark.sql.execution.exchange.ShuffleExchangeExec@2042514973})
>
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.inputRDD(ShuffleExchangeExec.scala:135)
>
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture$lzycompute(ShuffleExchangeExec.scala:140)
> => holding
> Monitor(org.apache.spark.sql.execution.exchange.ShuffleExchangeExec@2042514973})
>
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.mapOutputStatisticsFuture(ShuffleExchangeExec.scala:139)
>
> org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
>
> org.apache.spark.sql.execution.exchange.ShuffleExchangeLike$$Lambda$3671/828122256.apply(Unknown
> Source)
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
>
> org.apache.spark.sql.execution.SparkPlan$$Lambda$3645/1297667696.apply(Unknown
> Source)
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
> org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
>
> org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
>
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.submitShuffleJob(ShuffleExchangeExec.scala:115)
>
> org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:174)
> => holding
> Monitor(org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec@890810300})
>
> org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:174)
>
> org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.doMaterialize(QueryStageExec.scala:176)
>
> org.apache.spark.sql.execution.adaptive.QueryStageExec.materialize(QueryStageExec.scala:82)
>
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:258)
>
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:256)
>
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3669/118271447.apply(Unknown
> Source) scala.collection.Iterator.foreach(Iterator.scala:943)
> scala.collection.Iterator.foreach$(Iterator.scala:943)
> scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> scala.collection.IterableLike.foreach(IterableLike.scala:74)
> scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:256)
>
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$Lambda$3616/1041219151.apply(Unknown
> Source) org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:228)
> => holding Monitor(java.lang.Object@603970601})
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:367)
>
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeTake(AdaptiveSparkPlanExec.scala:344)
>
> org.apache.spark.sql.execution.SubqueryExec.$anonfun$relationFuture$2(basicPhysicalOperators.scala:834)
>
> org.apache.spark.sql.execution.SubqueryExec$$Lambda$3652/1074832567.apply(Unknown
> Source)
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:145)
>
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$3653/1322734277.apply(Unknown
> Source)
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
>
> org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:143)
>
> org.apache.spark.sql.execution.SubqueryExec.$anonfun$relationFuture$1(basicPhysicalOperators.scala:830)
>
> org.apache.spark.sql.execution.SubqueryExec$$Lambda$3648/502350376.apply(Unknown
> Source)
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withThreadLocalCaptured$1(SQLExecution.scala:191)
>
> org.apache.spark.sql.execution.SQLExecution$$$Lambda$3649/2139778019.call(Unknown
> Source) java.util.concurrent.FutureTask.run(FutureTask.java:266)
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> java.lang.Thread.run(Thread.java:750)
> {code}
> Not sure what's causing it, please let me know if you need any info, thanks!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]