Tian Gao created SPARK-55362:
--------------------------------
Summary: Connect client can deadlock when it shutdown from thread
pool
Key: SPARK-55362
URL: https://issues.apache.org/jira/browse/SPARK-55362
Project: Spark
Issue Type: Bug
Components: Connect, PySpark
Affects Versions: 4.2.0
Reporter: Tian Gao
`__del__` could be triggered in any thread which can shutdown the threadpool
inside the pool itself - causing deadlock.
{code:python}
Traceback for thread 165248 (python3.12) [] (most recent call last):
(Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
self._bootstrap_inner()
(Python) File "/usr/lib/python3.12/threading.py", line 1075, in
_bootstrap_inner
self.run()
(Python) File "/usr/lib/python3.12/threading.py", line 1012, in run
self._target(*self._args, **self._kwargs)
(Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 93,
in _worker
work_item.run()
(Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 59,
in run
result = self.fn(*self.args, **self.kwargs)
(Python) File
"/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 238, in
target
self._stub.ReleaseExecute(request, metadata=self._metadata)
(Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py",
line 1163, in __call__
state, call = self._blocking(
(Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py",
line 1134, in _blocking
call = self._channel.segregated_call(
(Python) File "/usr/lib/python3.12/threading.py", line 942, in __init__
_dangling.add(self)
(Python) File "/usr/lib/python3.12/_weakrefset.py", line 85, in add
def add(self, item):
(Python) File "/__w/spark/spark/python/pyspark/sql/connect/session.py",
line 872, in __del__
self.client.close()
(Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py",
line 1275, in close
ExecutePlanResponseReattachableIterator.shutdown()
(Python) File
"/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 84, in
shutdown
thread_pool.shutdown()
(Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 239,
in shutdown
t.join()
(Python) File "/usr/lib/python3.12/threading.py", line 1149, in join
self._wait_for_tstate_lock()
(Python) File "/usr/lib/python3.12/threading.py", line 1169, in
_wait_for_tstate_lock
if lock.acquire(block, timeout):
Traceback for thread 165239 (python3.12) [] (most recent call last):
(Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
self._bootstrap_inner()
(Python) File "/usr/lib/python3.12/threading.py", line 1075, in
_bootstrap_inner
self.run()
(Python) File "/usr/lib/python3.12/threading.py", line 1012, in run
self._target(*self._args, **self._kwargs)
(Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 93,
in _worker
work_item.run()
(Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 59,
in run
result = self.fn(*self.args, **self.kwargs)
(Python) File
"/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 211, in
target
self._stub.ReleaseExecute(request, metadata=self._metadata)
(Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py",
line 1163, in __call__
state, call = self._blocking(
(Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py",
line 1150, in _blocking
event = call.next_event()
(Python) File "/usr/lib/python3.12/threading.py", line 300, in __enter__
return self._lock.__enter__()
Traceback for thread 165221 (python3.12) [] (most recent call last):
(Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
self._bootstrap_inner()
(Python) File "/usr/lib/python3.12/threading.py", line 1075, in
_bootstrap_inner
self.run()
(Python) File "/usr/lib/python3.12/threading.py", line 1012, in run
self._target(*self._args, **self._kwargs)
(Python) File "/usr/lib/python3.12/socketserver.py", line 235, in
serve_forever
ready = selector.select(poll_interval)
(Python) File "/usr/lib/python3.12/selectors.py", line 415, in select
fd_event_list = self._selector.poll(timeout)
Traceback for thread 164693 (python3.12) [] (most recent call last):
(Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
self._bootstrap_inner()
(Python) File "/usr/lib/python3.12/threading.py", line 1075, in
_bootstrap_inner
self.run()
(Python) File
"/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line
58, in run
Traceback for thread 164627 (python3.12) [] (most recent call last):
(Python) File "<frozen runpy>", line 198, in _run_module_as_main
(Python) File "<frozen runpy>", line 88, in _run_code
(Python) File
"/__w/spark/spark/python/pyspark/sql/tests/connect/test_connect_session.py",
line 331, in <module>
main()
(Python) File "/__w/spark/spark/python/pyspark/testing/unittestutils.py",
line 38, in main
res = unittest.main(module=module, testRunner=testRunner, verbosity=2,
exit=False)
(Python) File "/usr/lib/python3.12/unittest/main.py", line 105, in __init__
self.runTests()
(Python) File "/usr/lib/python3.12/unittest/main.py", line 281, in runTests
self.result = testRunner.run(self.test)
(Python) File
"/usr/local/lib/python3.12/dist-packages/xmlrunner/runner.py", line 67, in run
test(result)
(Python) File "/usr/lib/python3.12/unittest/suite.py", line 84, in __call__
return self.run(*args, **kwds)
(Python) File "/usr/lib/python3.12/unittest/suite.py", line 122, in run
test(result)
(Python) File "/usr/lib/python3.12/unittest/suite.py", line 84, in __call__
return self.run(*args, **kwds)
(Python) File "/usr/lib/python3.12/unittest/suite.py", line 122, in run
test(result)
(Python) File "/usr/lib/python3.12/unittest/case.py", line 690, in __call__
return self.run(*args, **kwds)
(Python) File "/usr/lib/python3.12/unittest/case.py", line 634, in run
self._callTestMethod(testMethod)
(Python) File "/usr/lib/python3.12/unittest/case.py", line 589, in
_callTestMethod
if method() is not None:
(Python) File
"/__w/spark/spark/python/pyspark/sql/tests/connect/test_connect_session.py",
line 139, in test_error_enrichment_jvm_stacktrace
).collect()
(Python) File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py",
line 1826, in collect
table, schema = self._to_table()
(Python) File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py",
line 1846, in _to_table
table, schema, self._execution_info = self._session.client.to_table(
(Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py",
line 1038, in to_table
table, schema, metrics, observed_metrics, _ =
self._execute_and_fetch(req, observations)
(Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py",
line 1725, in _execute_and_fetch
for response in self._execute_and_fetch_as_iterator(
(Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py",
line 1682, in _execute_and_fetch_as_iterator
generator = ExecutePlanResponseReattachableIterator(
(Python) File
"/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 127, in
__init__
self._stub.ExecutePlan(self._initial_request, metadata=metadata)
(Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py",
line 1396, in __call__
call = self._managed_call(
(Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py",
line 1785, in create
call = state.channel.integrated_call(
(Python) File "/usr/lib/python3.12/threading.py", line 300, in __enter__
return self._lock.__enter__()
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]