[
https://issues.apache.org/jira/browse/SPARK-50176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Peter Andrew updated SPARK-50176:
---------------------------------
Description:
Using PySpark Connect to run `writeTo(...).overwritePartitions(...)` commands,
I frequently see that for large queries, the client often gets stuck and never
returns although the SQL query is shown as successfully finished in the Spark
UI, and the data has been correctly written.
This seems due to the operation staying stuck in `ReadyForExecution`, even
though the SQL query has finished. Then, the gRPC observer is never notified,
and thus a response is never sent to the client.
Even after the SQL query has finished, the following logs keep occuring over
and over again:
{{INFO ExecuteGrpcResponseSender: Starting for
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf, reattachable=true,
lastConsumedStreamIndex=0}}
{{TRACE ExecuteGrpcResponseSender: Trying to get next response with index=1.}}
{{TRACE ExecuteGrpcResponseSender: Acquired executionObserver lock.}}
{{TRACE ExecuteGrpcResponseSender: Try to get response with index=1 from
observer.}}
{{TRACE ExecuteGrpcResponseSender: Response index=1 from observer: false}}
{{{}TRACE ExecuteGrpcResponseSender: Wait for response to become available with
timeout=120000 ms.{}}}{{{{}}{}}}
The request is still shown in the Spark UI under the Connect tab in the
Requests table, with status RUNNING.
Then, after shutting down the client, after a bit, I see the following:
{{INFO SparkConnectExecutionManager: Started periodic run of
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Finished periodic run of
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Started periodic run of
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Found execution ExecuteInfo(session_id:
"9075a58c-f298-496d-afea-c76fc09d37e3", ...,}}
{{ operation_id: "8ee873b7-32b9-4eb6-963f-21318e2aebcf"}}
{{
,,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
{{
SparkConnect_OperationTag_User__Session_9075a58c-f298-496d-afea-c76fc09d37e3_Operation_8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
{{ Set(),true,ReadyForExecution,1730282712338,Some(1730290272497),None)}}
{{ that was abandoned and expired and will be removed.}}
{{INFO SparkConnectExecutionManager: ExecuteHolder
ExecuteKey(,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf)
is removed.}}
{{INFO ExecuteResponseObserver: Release all for
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats:
total=CachedSize(0,0) autoRemoved=CachedSize(0,0)
cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0)
maxCachedUntilConsumed=CachedSize(0,0) maxCachedUntilProduced=CachedSize(0,0)}}
{{INFO ExecuteResponseObserver: Release all for
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats:
total=CachedSize(0,0) autoRemoved=CachedSize(0,0)
cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0)
maxCachedUntilConsumed=CachedSize(0,0) maxCachedUntilProduced=CachedSize(0,0)}}
{{java.lang.IllegalStateException: }}
{{ operationId: 8ee873b7-32b9-4eb6-963f-21318e2aebcf with status
ReadyForExecution}}
{{ is not within statuses List(Finished, Failed, Canceled) for event
Closed}}
{{ }}
{{ at
org.apache.spark.sql.connect.service.ExecuteEventsManager.assertStatus(ExecuteEventsManager.scala:261)}}
{{ at
org.apache.spark.sql.connect.service.ExecuteEventsManager.postClosed(ExecuteEventsManager.scala:229)}}
{{ at
org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1(ExecuteHolder.scala:240)}}
{{ at
org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1$adapted(ExecuteHolder.scala:234)}}
{{ at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)}}
{{ at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)}}
{{ at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)}}
{{ at java.base/java.lang.Thread.run(Thread.java:840)}}
My guess is that an exception occurs in
`SparkConnectPlanner.handleWriteOperationV2` after the query has succeeded, but
before we get to call `executeHolder.eventsManager.postFinished`. However, I
have not seen anything in the logs.
I have not been able to construct a proof-of-concept to reproduce, but this
occurs very often on real workloads. I am unclear on the trigger: this almost
never occurs on 'small' jobs (meaning < 30mins and < 100,000 partitions, say),
but often (say, one in five) on larger jobs.
cc [~juliuszsompolski]
was:
Using PySpark Connect to run `writeTo(...).overwritePartitions(...)` commands,
I frequently see that for large queries, the client often gets stuck and never
returns although the SQL query is shown as successfully finished in the Spark
UI, and the data has been correctly written.
This seems due to the operation staying stuck in `ReadyForExecution`, even
though the SQL query has finished. Then, the gRPC observer is never notified,
and thus a response is never sent to the client.
Even after the SQL query has finished, the following logs keep occuring over
and over again:
{{INFO ExecuteGrpcResponseSender: Starting for
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf, reattachable=true,
lastConsumedStreamIndex=0}}
{{TRACE ExecuteGrpcResponseSender: Trying to get next response with index=1.}}
{{TRACE ExecuteGrpcResponseSender: Acquired executionObserver lock.}}
{{TRACE ExecuteGrpcResponseSender: Try to get response with index=1 from
observer.}}
{{TRACE ExecuteGrpcResponseSender: Response index=1 from observer: false}}
{{{}TRACE ExecuteGrpcResponseSender: Wait for response to become available with
timeout=120000 ms.{}}}{{{{}}{}}}
The request is still shown in the Spark UI under the Connect tab in the
Requests table, with status RUNNING.
Then, after shutting down the client, after a bit, I see the following:
{{INFO SparkConnectExecutionManager: Started periodic run of
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Finished periodic run of
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Started periodic run of
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Found execution ExecuteInfo(session_id:
"9075a58c-f298-496d-afea-c76fc09d37e3", ...,}}
{{ operation_id: "8ee873b7-32b9-4eb6-963f-21318e2aebcf"}}
{{
,,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
{{
SparkConnect_OperationTag_User__Session_9075a58c-f298-496d-afea-c76fc09d37e3_Operation_8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
{{ Set(),true,ReadyForExecution,1730282712338,Some(1730290272497),None)}}
{{ that was abandoned and expired and will be removed.}}
{{INFO SparkConnectExecutionManager: ExecuteHolder
ExecuteKey(,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf)
is removed.}}
{{INFO ExecuteResponseObserver: Release all for
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats:
total=CachedSize(0,0) autoRemoved=CachedSize(0,0)
cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0)
maxCachedUntilConsumed=CachedSize(0,0) maxCachedUntilProduced=CachedSize(0,0)}}
{{INFO ExecuteResponseObserver: Release all for
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats:
total=CachedSize(0,0) autoRemoved=CachedSize(0,0)
cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0)
maxCachedUntilConsumed=CachedSize(0,0) maxCachedUntilProduced=CachedSize(0,0)}}
{{java.lang.IllegalStateException: }}
{{ operationId: 8ee873b7-32b9-4eb6-963f-21318e2aebcf with status
ReadyForExecution}}
{{ is not within statuses List(Finished, Failed, Canceled) for event
Closed}}
{{ }}
{{ at
org.apache.spark.sql.connect.service.ExecuteEventsManager.assertStatus(ExecuteEventsManager.scala:261)}}
{{ at
org.apache.spark.sql.connect.service.ExecuteEventsManager.postClosed(ExecuteEventsManager.scala:229)}}
{{ at
org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1(ExecuteHolder.scala:240)}}
{{ at
org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1$adapted(ExecuteHolder.scala:234)}}
{{ at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)}}
{{ at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)}}
{{ at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)}}
{{ at java.base/java.lang.Thread.run(Thread.java:840)}}
I have not been able to construct a proof-of-concept to reproduce, but this
occurs very often on real workloads. I am unclear on the trigger: this almost
never occurs on 'small' jobs (meaning < 30mins and < 100,000 partitions, say),
but often (say, one in five) on larger jobs.
cc [~juliuszsompolski]
> Connect operation stuck in `ReadyForExecution` even though query has finished
> -----------------------------------------------------------------------------
>
> Key: SPARK-50176
> URL: https://issues.apache.org/jira/browse/SPARK-50176
> Project: Spark
> Issue Type: Bug
> Components: Connect
> Affects Versions: 3.5.2
> Reporter: Peter Andrew
> Priority: Major
>
> Using PySpark Connect to run `writeTo(...).overwritePartitions(...)`
> commands, I frequently see that for large queries, the client often gets
> stuck and never returns although the SQL query is shown as successfully
> finished in the Spark UI, and the data has been correctly written.
> This seems due to the operation staying stuck in `ReadyForExecution`, even
> though the SQL query has finished. Then, the gRPC observer is never notified,
> and thus a response is never sent to the client.
> Even after the SQL query has finished, the following logs keep occuring over
> and over again:
> {{INFO ExecuteGrpcResponseSender: Starting for
> opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf, reattachable=true,
> lastConsumedStreamIndex=0}}
> {{TRACE ExecuteGrpcResponseSender: Trying to get next response with index=1.}}
> {{TRACE ExecuteGrpcResponseSender: Acquired executionObserver lock.}}
> {{TRACE ExecuteGrpcResponseSender: Try to get response with index=1 from
> observer.}}
> {{TRACE ExecuteGrpcResponseSender: Response index=1 from observer: false}}
> {{{}TRACE ExecuteGrpcResponseSender: Wait for response to become available
> with timeout=120000 ms.{}}}{{{{}}{}}}
> The request is still shown in the Spark UI under the Connect tab in the
> Requests table, with status RUNNING.
> Then, after shutting down the client, after a bit, I see the following:
> {{INFO SparkConnectExecutionManager: Started periodic run of
> SparkConnectExecutionManager maintenance.}}
> {{INFO SparkConnectExecutionManager: Finished periodic run of
> SparkConnectExecutionManager maintenance.}}
> {{INFO SparkConnectExecutionManager: Started periodic run of
> SparkConnectExecutionManager maintenance.}}
> {{INFO SparkConnectExecutionManager: Found execution ExecuteInfo(session_id:
> "9075a58c-f298-496d-afea-c76fc09d37e3", ...,}}
> {{ operation_id: "8ee873b7-32b9-4eb6-963f-21318e2aebcf"}}
> {{
> ,,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
> {{
> SparkConnect_OperationTag_User__Session_9075a58c-f298-496d-afea-c76fc09d37e3_Operation_8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
> {{ Set(),true,ReadyForExecution,1730282712338,Some(1730290272497),None)}}
> {{ that was abandoned and expired and will be removed.}}
> {{INFO SparkConnectExecutionManager: ExecuteHolder
> ExecuteKey(,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf)
> is removed.}}
> {{INFO ExecuteResponseObserver: Release all for
> opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats:
> total=CachedSize(0,0) autoRemoved=CachedSize(0,0)
> cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0)
> maxCachedUntilConsumed=CachedSize(0,0)
> maxCachedUntilProduced=CachedSize(0,0)}}
> {{INFO ExecuteResponseObserver: Release all for
> opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats:
> total=CachedSize(0,0) autoRemoved=CachedSize(0,0)
> cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0)
> maxCachedUntilConsumed=CachedSize(0,0)
> maxCachedUntilProduced=CachedSize(0,0)}}
> {{java.lang.IllegalStateException: }}
> {{ operationId: 8ee873b7-32b9-4eb6-963f-21318e2aebcf with status
> ReadyForExecution}}
> {{ is not within statuses List(Finished, Failed, Canceled) for event
> Closed}}
> {{ }}
> {{ at
> org.apache.spark.sql.connect.service.ExecuteEventsManager.assertStatus(ExecuteEventsManager.scala:261)}}
> {{ at
> org.apache.spark.sql.connect.service.ExecuteEventsManager.postClosed(ExecuteEventsManager.scala:229)}}
> {{ at
> org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1(ExecuteHolder.scala:240)}}
> {{ at
> org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1$adapted(ExecuteHolder.scala:234)}}
> {{ at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)}}
> {{ at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)}}
> {{ at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)}}
> {{ at java.base/java.lang.Thread.run(Thread.java:840)}}
> My guess is that an exception occurs in
> `SparkConnectPlanner.handleWriteOperationV2` after the query has succeeded,
> but before we get to call `executeHolder.eventsManager.postFinished`.
> However, I have not seen anything in the logs.
> I have not been able to construct a proof-of-concept to reproduce, but this
> occurs very often on real workloads. I am unclear on the trigger: this almost
> never occurs on 'small' jobs (meaning < 30mins and < 100,000 partitions,
> say), but often (say, one in five) on larger jobs.
>
> cc [~juliuszsompolski]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]