[
https://issues.apache.org/jira/browse/SPARK-50176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17894685#comment-17894685
]
Peter Andrew commented on SPARK-50176:
--------------------------------------
I see, that makes more sense, thank you [~changgyoopark-db]!
> Connect operation stuck in `ReadyForExecution` even though query has finished
> -----------------------------------------------------------------------------
>
> Key: SPARK-50176
> URL: https://issues.apache.org/jira/browse/SPARK-50176
> Project: Spark
> Issue Type: Bug
> Components: Connect
> Affects Versions: 3.5.2
> Reporter: Peter Andrew
> Priority: Major
> Labels: pull-request-available
> Attachments: spark-issue-logs.log.gz
>
>
> Using PySpark Connect to run `writeTo(...).overwritePartitions(...)`
> commands, I frequently see that for large queries, the client often gets
> stuck and never returns although the SQL query is shown as successfully
> finished in the Spark UI, and the data has been correctly written.
> This seems due to the operation staying stuck in `ReadyForExecution`, even
> though the SQL query has finished. Then, the gRPC observer is never notified,
> and thus a response is never sent to the client.
> Even after the SQL query has finished, the following logs keep occuring over
> and over again:
> {{INFO ExecuteGrpcResponseSender: Starting for
> opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf, reattachable=true,
> lastConsumedStreamIndex=0}}
> {{TRACE ExecuteGrpcResponseSender: Trying to get next response with index=1.}}
> {{TRACE ExecuteGrpcResponseSender: Acquired executionObserver lock.}}
> {{TRACE ExecuteGrpcResponseSender: Try to get response with index=1 from
> observer.}}
> {{TRACE ExecuteGrpcResponseSender: Response index=1 from observer: false}}
> {{{}TRACE ExecuteGrpcResponseSender: Wait for response to become available
> with timeout=120000 ms.{}}}{{{{}}{}}}
> The request is still shown in the Spark UI under the Connect tab in the
> Requests table, with status RUNNING.
> Then, after shutting down the client, after a bit, I see the following:
> {{INFO SparkConnectExecutionManager: Started periodic run of
> SparkConnectExecutionManager maintenance.}}
> {{INFO SparkConnectExecutionManager: Finished periodic run of
> SparkConnectExecutionManager maintenance.}}
> {{INFO SparkConnectExecutionManager: Started periodic run of
> SparkConnectExecutionManager maintenance.}}
> {{INFO SparkConnectExecutionManager: Found execution ExecuteInfo(session_id:
> "9075a58c-f298-496d-afea-c76fc09d37e3", ...,}}
> {{ operation_id: "8ee873b7-32b9-4eb6-963f-21318e2aebcf"}}
> {{
> ,,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
> {{
> SparkConnect_OperationTag_User__Session_9075a58c-f298-496d-afea-c76fc09d37e3_Operation_8ee873b7-32b9-4eb6-963f-21318e2aebcf,}}
> {{ Set(),true,ReadyForExecution,1730282712338,Some(1730290272497),None)}}
> {{ that was abandoned and expired and will be removed.}}
> {{INFO SparkConnectExecutionManager: ExecuteHolder
> ExecuteKey(,9075a58c-f298-496d-afea-c76fc09d37e3,8ee873b7-32b9-4eb6-963f-21318e2aebcf)
> is removed.}}
> {{INFO ExecuteResponseObserver: Release all for
> opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats:
> total=CachedSize(0,0) autoRemoved=CachedSize(0,0)
> cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0)
> maxCachedUntilConsumed=CachedSize(0,0)
> maxCachedUntilProduced=CachedSize(0,0)}}
> {{INFO ExecuteResponseObserver: Release all for
> opId=8ee873b7-32b9-4eb6-963f-21318e2aebcf. Execution stats:
> total=CachedSize(0,0) autoRemoved=CachedSize(0,0)
> cachedUntilConsumed=CachedSize(0,0) cachedUntilProduced=CachedSize(0,0)
> maxCachedUntilConsumed=CachedSize(0,0)
> maxCachedUntilProduced=CachedSize(0,0)}}
> {{java.lang.IllegalStateException: }}
> {{ operationId: 8ee873b7-32b9-4eb6-963f-21318e2aebcf with status
> ReadyForExecution}}
> {{ is not within statuses List(Finished, Failed, Canceled) for event
> Closed}}
> {{ }}
> {{ at
> org.apache.spark.sql.connect.service.ExecuteEventsManager.assertStatus(ExecuteEventsManager.scala:261)}}
> {{ at
> org.apache.spark.sql.connect.service.ExecuteEventsManager.postClosed(ExecuteEventsManager.scala:229)}}
> {{ at
> org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1(ExecuteHolder.scala:240)}}
> {{ at
> org.apache.spark.sql.connect.service.ExecuteHolder.$anonfun$close$1$adapted(ExecuteHolder.scala:234)}}
> {{ at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)}}
> {{ at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)}}
> {{ at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)}}
> {{ at java.base/java.lang.Thread.run(Thread.java:840)}}
> My guess is that an exception occurs in
> `SparkConnectPlanner.handleWriteOperationV2` after the query has succeeded,
> but before we get to call `executeHolder.eventsManager.postFinished`.
> However, I have not seen anything in the logs.
> I have not been able to construct a proof-of-concept to reproduce, but this
> occurs very often on real workloads. I am unclear on the trigger: this almost
> never occurs on 'small' jobs (meaning < 30mins and < 100,000 partitions,
> say), but often (say, one in five) on larger jobs.
>
> cc [~juliuszsompolski]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]