Peter Andrew created SPARK-50176:
------------------------------------

             Summary: Connect operation stuck in `ReadyForExecution` even 
though query has finished
                 Key: SPARK-50176
                 URL: https://issues.apache.org/jira/browse/SPARK-50176
             Project: Spark
          Issue Type: Bug
          Components: Connect
    Affects Versions: 3.5.2
            Reporter: Peter Andrew


Using PySpark Connect to run `writeTo(...).overwritePartitions(...)` commands, 
I frequently see that for large queries, the client often gets stuck and never 
returns although the SQL query is shown as successfully finished in the Spark 
UI, and the data has been correctly written.

This seems due to the operation staying stuck in `ReadyForExecution`, even 
though the SQL query has finished. Then, the gRPC observer is never notified, 
and thus a response is never sent to the client.

Even after the SQL query has finished, the following logs keep occuring over 
and over again:

{{INFO ExecuteGrpcResponseSender: Starting for 
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf, reattachable=true, 
lastConsumedStreamIndex=0}}
{{TRACE ExecuteGrpcResponseSender: Trying to get next response with index=1.}}
{{TRACE ExecuteGrpcResponseSender: Acquired executionObserver lock.}}
{{TRACE ExecuteGrpcResponseSender: Try to get response with index=1 from 
observer.}}
{{TRACE ExecuteGrpcResponseSender: Response index=1 from observer: false}}
{{{}TRACE ExecuteGrpcResponseSender: Wait for response to become available with 
timeout=120000 ms.{}}}{{{{}}{}}}

The request is still shown in the Spark UI under the Connect tab in the 
Requests table, with status RUNNING.

Then, after shutting down the client, after a bit, I see the following:

{{INFO SparkConnectExecutionManager: Started periodic run of 
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Finished periodic run of 
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Started periodic run of 
SparkConnectExecutionManager maintenance.}}
{{INFO SparkConnectExecutionManager: Found execution ExecuteInfo(session_id: 
"9075a58c-f298-496d-afea-c76fc09d37e3", ...,}}
{{    operation_id: "8ee873b7-32b9-4eb6-963f-21318e2aebcf"}}
{{    
,,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
{{    
SparkConnect_OperationTag_User__Session_9075a58c-f298-496d-afea-c76fc09d37e3_Operation_8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
{{    Set(),true,ReadyForExecution,1730282712338,Some(1730290272497),None)}}
{{    that was abandoned and expired and will be removed.}}
{{INFO SparkConnectExecutionManager: ExecuteHolder 
ExecuteKey(,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf)
 is removed.}}
{{INFO ExecuteResponseObserver: Release all for 
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats: 
total=CachedSize(0,0) autoRemoved=CachedSize(0,0) 
cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0) 
maxCachedUntilConsumed=CachedSize(0,0) maxCachedUntilProduced=CachedSize(0,0)}}
{{INFO ExecuteResponseObserver: Release all for 
opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats: 
total=CachedSize(0,0) autoRemoved=CachedSize(0,0) 
cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0) 
maxCachedUntilConsumed=CachedSize(0,0) maxCachedUntilProduced=CachedSize(0,0)}}
{{java.lang.IllegalStateException: }}
{{        operationId: 8ee873b7-32b9-4eb6-963f-21318e2aebcf with status 
ReadyForExecution}}
{{        is not within statuses List(Finished, Failed, Canceled) for event 
Closed}}
{{        }}
{{    at 
org.apache.spark.sql.connect.service.ExecuteEventsManager.assertStatus(ExecuteEventsManager.scala:261)}}
{{    at 
org.apache.spark.sql.connect.service.ExecuteEventsManager.postClosed(ExecuteEventsManager.scala:229)}}
{{    at 
org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1(ExecuteHolder.scala:240)}}
{{    at 
org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1$adapted(ExecuteHolder.scala:234)}}
{{    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)}}
{{    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)}}
{{    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)}}
{{    at java.base/java.lang.Thread.run(Thread.java:840)}}

I have not been able to construct a proof-of-concept to reproduce, but this 
occurs very often on real workloads. I am unclear on the trigger: this almost 
never occurs on 'small' jobs (meaning < 30mins and < 100,000 partitions, say), 
but often (say, one in five) on larger jobs.

cc [~juliuszsompolski]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to