[
https://issues.apache.org/jira/browse/SPARK-55362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon resolved SPARK-55362.
----------------------------------
Fix Version/s: 4.2.0
Resolution: Fixed
Issue resolved by pull request 54145
[https://github.com/apache/spark/pull/54145]
> Connect client can deadlock when it shutdown from thread pool
> -------------------------------------------------------------
>
> Key: SPARK-55362
> URL: https://issues.apache.org/jira/browse/SPARK-55362
> Project: Spark
> Issue Type: Bug
> Components: Connect, PySpark
> Affects Versions: 4.2.0
> Reporter: Tian Gao
> Assignee: Tian Gao
> Priority: Major
> Labels: pull-request-available
> Fix For: 4.2.0
>
>
> `__del__` could be triggered in any thread which can shutdown the threadpool
> inside the pool itself - causing deadlock.
> {code:python}
> Traceback for thread 165248 (python3.12) [] (most recent call last):
> (Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
> self._bootstrap_inner()
> (Python) File "/usr/lib/python3.12/threading.py", line 1075, in
> _bootstrap_inner
> self.run()
> (Python) File "/usr/lib/python3.12/threading.py", line 1012, in run
> self._target(*self._args, **self._kwargs)
> (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line
> 93, in _worker
> work_item.run()
> (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line
> 59, in run
> result = self.fn(*self.args, **self.kwargs)
> (Python) File
> "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 238,
> in target
> self._stub.ReleaseExecute(request, metadata=self._metadata)
> (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py",
> line 1163, in __call__
> state, call = self._blocking(
> (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py",
> line 1134, in _blocking
> call = self._channel.segregated_call(
> (Python) File "/usr/lib/python3.12/threading.py", line 942, in __init__
> _dangling.add(self)
> (Python) File "/usr/lib/python3.12/_weakrefset.py", line 85, in add
> def add(self, item):
> (Python) File "/__w/spark/spark/python/pyspark/sql/connect/session.py",
> line 872, in __del__
> self.client.close()
> (Python) File
> "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1275, in
> close
> ExecutePlanResponseReattachableIterator.shutdown()
> (Python) File
> "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 84, in
> shutdown
> thread_pool.shutdown()
> (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line
> 239, in shutdown
> t.join()
> (Python) File "/usr/lib/python3.12/threading.py", line 1149, in join
> self._wait_for_tstate_lock()
> (Python) File "/usr/lib/python3.12/threading.py", line 1169, in
> _wait_for_tstate_lock
> if lock.acquire(block, timeout):
> Traceback for thread 165239 (python3.12) [] (most recent call last):
> (Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
> self._bootstrap_inner()
> (Python) File "/usr/lib/python3.12/threading.py", line 1075, in
> _bootstrap_inner
> self.run()
> (Python) File "/usr/lib/python3.12/threading.py", line 1012, in run
> self._target(*self._args, **self._kwargs)
> (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line
> 93, in _worker
> work_item.run()
> (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line
> 59, in run
> result = self.fn(*self.args, **self.kwargs)
> (Python) File
> "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 211,
> in target
> self._stub.ReleaseExecute(request, metadata=self._metadata)
> (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py",
> line 1163, in __call__
> state, call = self._blocking(
> (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py",
> line 1150, in _blocking
> event = call.next_event()
> (Python) File "/usr/lib/python3.12/threading.py", line 300, in __enter__
> return self._lock.__enter__()
> Traceback for thread 165221 (python3.12) [] (most recent call last):
> (Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
> self._bootstrap_inner()
> (Python) File "/usr/lib/python3.12/threading.py", line 1075, in
> _bootstrap_inner
> self.run()
> (Python) File "/usr/lib/python3.12/threading.py", line 1012, in run
> self._target(*self._args, **self._kwargs)
> (Python) File "/usr/lib/python3.12/socketserver.py", line 235, in
> serve_forever
> ready = selector.select(poll_interval)
> (Python) File "/usr/lib/python3.12/selectors.py", line 415, in select
> fd_event_list = self._selector.poll(timeout)
> Traceback for thread 164693 (python3.12) [] (most recent call last):
> (Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
> self._bootstrap_inner()
> (Python) File "/usr/lib/python3.12/threading.py", line 1075, in
> _bootstrap_inner
> self.run()
> (Python) File
> "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py",
> line 58, in run
> Traceback for thread 164627 (python3.12) [] (most recent call last):
> (Python) File "<frozen runpy>", line 198, in _run_module_as_main
> (Python) File "<frozen runpy>", line 88, in _run_code
> (Python) File
> "/__w/spark/spark/python/pyspark/sql/tests/connect/test_connect_session.py",
> line 331, in <module>
> main()
> (Python) File "/__w/spark/spark/python/pyspark/testing/unittestutils.py",
> line 38, in main
> res = unittest.main(module=module, testRunner=testRunner,
> verbosity=2, exit=False)
> (Python) File "/usr/lib/python3.12/unittest/main.py", line 105, in
> __init__
> self.runTests()
> (Python) File "/usr/lib/python3.12/unittest/main.py", line 281, in
> runTests
> self.result = testRunner.run(self.test)
> (Python) File
> "/usr/local/lib/python3.12/dist-packages/xmlrunner/runner.py", line 67, in run
> test(result)
> (Python) File "/usr/lib/python3.12/unittest/suite.py", line 84, in
> __call__
> return self.run(*args, **kwds)
> (Python) File "/usr/lib/python3.12/unittest/suite.py", line 122, in run
> test(result)
> (Python) File "/usr/lib/python3.12/unittest/suite.py", line 84, in
> __call__
> return self.run(*args, **kwds)
> (Python) File "/usr/lib/python3.12/unittest/suite.py", line 122, in run
> test(result)
> (Python) File "/usr/lib/python3.12/unittest/case.py", line 690, in
> __call__
> return self.run(*args, **kwds)
> (Python) File "/usr/lib/python3.12/unittest/case.py", line 634, in run
> self._callTestMethod(testMethod)
> (Python) File "/usr/lib/python3.12/unittest/case.py", line 589, in
> _callTestMethod
> if method() is not None:
> (Python) File
> "/__w/spark/spark/python/pyspark/sql/tests/connect/test_connect_session.py",
> line 139, in test_error_enrichment_jvm_stacktrace
> ).collect()
> (Python) File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py",
> line 1826, in collect
> table, schema = self._to_table()
> (Python) File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py",
> line 1846, in _to_table
> table, schema, self._execution_info = self._session.client.to_table(
> (Python) File
> "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1038, in
> to_table
> table, schema, metrics, observed_metrics, _ =
> self._execute_and_fetch(req, observations)
> (Python) File
> "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1725, in
> _execute_and_fetch
> for response in self._execute_and_fetch_as_iterator(
> (Python) File
> "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1682, in
> _execute_and_fetch_as_iterator
> generator = ExecutePlanResponseReattachableIterator(
> (Python) File
> "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 127,
> in __init__
> self._stub.ExecutePlan(self._initial_request, metadata=metadata)
> (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py",
> line 1396, in __call__
> call = self._managed_call(
> (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py",
> line 1785, in create
> call = state.channel.integrated_call(
> (Python) File "/usr/lib/python3.12/threading.py", line 300, in __enter__
> return self._lock.__enter__()
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]