[ 
https://issues.apache.org/jira/browse/SPARK-50176?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Andrew updated SPARK-50176:
---------------------------------
    Description: 
Using PySpark Connect to run `writeTo(...).overwritePartitions(...)` commands, 
I frequently see that for large queries, the client often gets stuck and never 
returns although the SQL query is shown as successfully finished in the Spark 
UI, and the data has been correctly written.

This seems due to the operation staying stuck in `ReadyForExecution`, even 
though the SQL query has finished. Then, the gRPC observer is never notified, 
and thus a response is never sent to the client.

Even after the SQL query has finished, the following logs keep occuring over 
and over again:

{{INFO ExecuteGrpcResponseSender: Starting for 
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf, reattachable=true, 
lastConsumedStreamIndex=0}}
{{TRACE ExecuteGrpcResponseSender: Trying to get next response with index=1.}}
{{TRACE ExecuteGrpcResponseSender: Acquired executionObserver lock.}}
{{TRACE ExecuteGrpcResponseSender: Try to get response with index=1 from 
observer.}}
{{TRACE ExecuteGrpcResponseSender: Response index=1 from observer: false}}
{{{}TRACE ExecuteGrpcResponseSender: Wait for response to become available with 
timeout=120000 ms.{}}}{{{{}}{}}}

The request is still shown in the Spark UI under the Connect tab in the 
Requests table, with status RUNNING.

Then, after shutting down the client, after a bit, I see the following:

{{INFO SparkConnectExecutionManager: Started periodic run of 
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Finished periodic run of 
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Started periodic run of 
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Found execution ExecuteInfo(session_id: 
"9075a58c-f298-496d-afea-c76fc09d37e3", ...,}}
{{    operation_id: "8ee873b7-32b9-4eb6-963f-21318e2aebcf"}}
{{    
,,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
{{    
SparkConnect_OperationTag_User__Session_9075a58c-f298-496d-afea-c76fc09d37e3_Operation_8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
{{    Set(),true,ReadyForExecution,1730282712338,Some(1730290272497),None)}}
{{    that was abandoned and expired and will be removed.}}
{{INFO SparkConnectExecutionManager: ExecuteHolder 
ExecuteKey(,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf)
 is removed.}}
{{INFO ExecuteResponseObserver: Release all for 
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats: 
total=CachedSize(0,0) autoRemoved=CachedSize(0,0) 
cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0) 
maxCachedUntilConsumed=CachedSize(0,0) maxCachedUntilProduced=CachedSize(0,0)}}
{{INFO ExecuteResponseObserver: Release all for 
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats: 
total=CachedSize(0,0) autoRemoved=CachedSize(0,0) 
cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0) 
maxCachedUntilConsumed=CachedSize(0,0) maxCachedUntilProduced=CachedSize(0,0)}}
{{java.lang.IllegalStateException: }}
{{        operationId: 8ee873b7-32b9-4eb6-963f-21318e2aebcf with status 
ReadyForExecution}}
{{        is not within statuses List(Finished, Failed, Canceled) for event 
Closed}}
{{        }}
{{    at 
org.apache.spark.sql.connect.service.ExecuteEventsManager.assertStatus(ExecuteEventsManager.scala:261)}}
{{    at 
org.apache.spark.sql.connect.service.ExecuteEventsManager.postClosed(ExecuteEventsManager.scala:229)}}
{{    at 
org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1(ExecuteHolder.scala:240)}}
{{    at 
org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1$adapted(ExecuteHolder.scala:234)}}
{{    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)}}
{{    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)}}
{{    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)}}
{{    at java.base/java.lang.Thread.run(Thread.java:840)}}

My guess is that an exception occurs in 
`SparkConnectPlanner.handleWriteOperationV2` after the query has succeeded, but 
before we get to call `executeHolder.eventsManager.postFinished`. However, I 
have not seen anything in the logs.

I have not been able to construct a proof-of-concept to reproduce, but this 
occurs very often on real workloads. I am unclear on the trigger: this almost 
never occurs on 'small' jobs (meaning < 30mins and < 100,000 partitions, say), 
but often (say, one in five) on larger jobs.

 

cc [~juliuszsompolski]

  was:
Using PySpark Connect to run `writeTo(...).overwritePartitions(...)` commands, 
I frequently see that for large queries, the client often gets stuck and never 
returns although the SQL query is shown as successfully finished in the Spark 
UI, and the data has been correctly written.

This seems due to the operation staying stuck in `ReadyForExecution`, even 
though the SQL query has finished. Then, the gRPC observer is never notified, 
and thus a response is never sent to the client.

Even after the SQL query has finished, the following logs keep occuring over 
and over again:

{{INFO ExecuteGrpcResponseSender: Starting for 
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf, reattachable=true, 
lastConsumedStreamIndex=0}}
{{TRACE ExecuteGrpcResponseSender: Trying to get next response with index=1.}}
{{TRACE ExecuteGrpcResponseSender: Acquired executionObserver lock.}}
{{TRACE ExecuteGrpcResponseSender: Try to get response with index=1 from 
observer.}}
{{TRACE ExecuteGrpcResponseSender: Response index=1 from observer: false}}
{{{}TRACE ExecuteGrpcResponseSender: Wait for response to become available with 
timeout=120000 ms.{}}}{{{{}}{}}}

The request is still shown in the Spark UI under the Connect tab in the 
Requests table, with status RUNNING.

Then, after shutting down the client, after a bit, I see the following:

{{INFO SparkConnectExecutionManager: Started periodic run of 
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Finished periodic run of 
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Started periodic run of 
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Found execution ExecuteInfo(session_id: 
"9075a58c-f298-496d-afea-c76fc09d37e3", ...,}}
{{    operation_id: "8ee873b7-32b9-4eb6-963f-21318e2aebcf"}}
{{    
,,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
{{    
SparkConnect_OperationTag_User__Session_9075a58c-f298-496d-afea-c76fc09d37e3_Operation_8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
{{    Set(),true,ReadyForExecution,1730282712338,Some(1730290272497),None)}}
{{    that was abandoned and expired and will be removed.}}
{{INFO SparkConnectExecutionManager: ExecuteHolder 
ExecuteKey(,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf)
 is removed.}}
{{INFO ExecuteResponseObserver: Release all for 
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats: 
total=CachedSize(0,0) autoRemoved=CachedSize(0,0) 
cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0) 
maxCachedUntilConsumed=CachedSize(0,0) maxCachedUntilProduced=CachedSize(0,0)}}
{{INFO ExecuteResponseObserver: Release all for 
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats: 
total=CachedSize(0,0) autoRemoved=CachedSize(0,0) 
cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0) 
maxCachedUntilConsumed=CachedSize(0,0) maxCachedUntilProduced=CachedSize(0,0)}}
{{java.lang.IllegalStateException: }}
{{        operationId: 8ee873b7-32b9-4eb6-963f-21318e2aebcf with status 
ReadyForExecution}}
{{        is not within statuses List(Finished, Failed, Canceled) for event 
Closed}}
{{        }}
{{    at 
org.apache.spark.sql.connect.service.ExecuteEventsManager.assertStatus(ExecuteEventsManager.scala:261)}}
{{    at 
org.apache.spark.sql.connect.service.ExecuteEventsManager.postClosed(ExecuteEventsManager.scala:229)}}
{{    at 
org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1(ExecuteHolder.scala:240)}}
{{    at 
org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1$adapted(ExecuteHolder.scala:234)}}
{{    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)}}
{{    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)}}
{{    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)}}
{{    at java.base/java.lang.Thread.run(Thread.java:840)}}

I have not been able to construct a proof-of-concept to reproduce, but this 
occurs very often on real workloads. I am unclear on the trigger: this almost 
never occurs on 'small' jobs (meaning < 30mins and < 100,000 partitions, say), 
but often (say, one in five) on larger jobs.

cc [~juliuszsompolski]


> Connect operation stuck in `ReadyForExecution` even though query has finished
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-50176
>                 URL: https://issues.apache.org/jira/browse/SPARK-50176
>             Project: Spark
>          Issue Type: Bug
>          Components: Connect
>    Affects Versions: 3.5.2
>            Reporter: Peter Andrew
>            Priority: Major
>
> Using PySpark Connect to run `writeTo(...).overwritePartitions(...)` 
> commands, I frequently see that for large queries, the client often gets 
> stuck and never returns although the SQL query is shown as successfully 
> finished in the Spark UI, and the data has been correctly written.
> This seems due to the operation staying stuck in `ReadyForExecution`, even 
> though the SQL query has finished. Then, the gRPC observer is never notified, 
> and thus a response is never sent to the client.
> Even after the SQL query has finished, the following logs keep occuring over 
> and over again:
> {{INFO ExecuteGrpcResponseSender: Starting for 
> opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf, reattachable=true, 
> lastConsumedStreamIndex=0}}
> {{TRACE ExecuteGrpcResponseSender: Trying to get next response with index=1.}}
> {{TRACE ExecuteGrpcResponseSender: Acquired executionObserver lock.}}
> {{TRACE ExecuteGrpcResponseSender: Try to get response with index=1 from 
> observer.}}
> {{TRACE ExecuteGrpcResponseSender: Response index=1 from observer: false}}
> {{{}TRACE ExecuteGrpcResponseSender: Wait for response to become available 
> with timeout=120000 ms.{}}}{{{{}}{}}}
> The request is still shown in the Spark UI under the Connect tab in the 
> Requests table, with status RUNNING.
> Then, after shutting down the client, after a bit, I see the following:
> {{INFO SparkConnectExecutionManager: Started periodic run of 
> SparkConnectExecutionManager maintenance.}}
> {{INFO SparkConnectExecutionManager: Finished periodic run of 
> SparkConnectExecutionManager maintenance.}}
> {{INFO SparkConnectExecutionManager: Started periodic run of 
> SparkConnectExecutionManager maintenance.}}
> {{INFO SparkConnectExecutionManager: Found execution ExecuteInfo(session_id: 
> "9075a58c-f298-496d-afea-c76fc09d37e3", ...,}}
> {{    operation_id: "8ee873b7-32b9-4eb6-963f-21318e2aebcf"}}
> {{    
> ,,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
> {{    
> SparkConnect_OperationTag_User__Session_9075a58c-f298-496d-afea-c76fc09d37e3_Operation_8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
> {{    Set(),true,ReadyForExecution,1730282712338,Some(1730290272497),None)}}
> {{    that was abandoned and expired and will be removed.}}
> {{INFO SparkConnectExecutionManager: ExecuteHolder 
> ExecuteKey(,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf)
>  is removed.}}
> {{INFO ExecuteResponseObserver: Release all for 
> opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats: 
> total=CachedSize(0,0) autoRemoved=CachedSize(0,0) 
> cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0) 
> maxCachedUntilConsumed=CachedSize(0,0) 
> maxCachedUntilProduced=CachedSize(0,0)}}
> {{INFO ExecuteResponseObserver: Release all for 
> opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats: 
> total=CachedSize(0,0) autoRemoved=CachedSize(0,0) 
> cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0) 
> maxCachedUntilConsumed=CachedSize(0,0) 
> maxCachedUntilProduced=CachedSize(0,0)}}
> {{java.lang.IllegalStateException: }}
> {{        operationId: 8ee873b7-32b9-4eb6-963f-21318e2aebcf with status 
> ReadyForExecution}}
> {{        is not within statuses List(Finished, Failed, Canceled) for event 
> Closed}}
> {{        }}
> {{    at 
> org.apache.spark.sql.connect.service.ExecuteEventsManager.assertStatus(ExecuteEventsManager.scala:261)}}
> {{    at 
> org.apache.spark.sql.connect.service.ExecuteEventsManager.postClosed(ExecuteEventsManager.scala:229)}}
> {{    at 
> org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1(ExecuteHolder.scala:240)}}
> {{    at 
> org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1$adapted(ExecuteHolder.scala:234)}}
> {{    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)}}
> {{    at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)}}
> {{    at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)}}
> {{    at java.base/java.lang.Thread.run(Thread.java:840)}}
> My guess is that an exception occurs in 
> `SparkConnectPlanner.handleWriteOperationV2` after the query has succeeded, 
> but before we get to call `executeHolder.eventsManager.postFinished`. 
> However, I have not seen anything in the logs.
> I have not been able to construct a proof-of-concept to reproduce, but this 
> occurs very often on real workloads. I am unclear on the trigger: this almost 
> never occurs on 'small' jobs (meaning < 30mins and < 100,000 partitions, 
> say), but often (say, one in five) on larger jobs.
>  
> cc [~juliuszsompolski]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to