[ 
https://issues.apache.org/jira/browse/SPARK-55362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-55362.
----------------------------------
    Fix Version/s: 4.2.0
       Resolution: Fixed

Issue resolved by pull request 54145
[https://github.com/apache/spark/pull/54145]

> Connect client can deadlock when it shutdown from thread pool
> -------------------------------------------------------------
>
>                 Key: SPARK-55362
>                 URL: https://issues.apache.org/jira/browse/SPARK-55362
>             Project: Spark
>          Issue Type: Bug
>          Components: Connect, PySpark
>    Affects Versions: 4.2.0
>            Reporter: Tian Gao
>            Assignee: Tian Gao
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 4.2.0
>
>
> `__del__` could be triggered in any thread which can shutdown the threadpool 
> inside the pool itself - causing deadlock.
> {code:python}
> Traceback for thread 165248 (python3.12) [] (most recent call last):
>     (Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
>         self._bootstrap_inner()
>     (Python) File "/usr/lib/python3.12/threading.py", line 1075, in 
> _bootstrap_inner
>         self.run()
>     (Python) File "/usr/lib/python3.12/threading.py", line 1012, in run
>         self._target(*self._args, **self._kwargs)
>     (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 
> 93, in _worker
>         work_item.run()
>     (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 
> 59, in run
>         result = self.fn(*self.args, **self.kwargs)
>     (Python) File 
> "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 238, 
> in target
>         self._stub.ReleaseExecute(request, metadata=self._metadata)
>     (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py", 
> line 1163, in __call__
>         state, call = self._blocking(
>     (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py", 
> line 1134, in _blocking
>         call = self._channel.segregated_call(
>     (Python) File "/usr/lib/python3.12/threading.py", line 942, in __init__
>         _dangling.add(self)
>     (Python) File "/usr/lib/python3.12/_weakrefset.py", line 85, in add
>         def add(self, item):
>     (Python) File "/__w/spark/spark/python/pyspark/sql/connect/session.py", 
> line 872, in __del__
>         self.client.close()
>     (Python) File 
> "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1275, in 
> close
>         ExecutePlanResponseReattachableIterator.shutdown()
>     (Python) File 
> "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 84, in 
> shutdown
>         thread_pool.shutdown()
>     (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 
> 239, in shutdown
>         t.join()
>     (Python) File "/usr/lib/python3.12/threading.py", line 1149, in join
>         self._wait_for_tstate_lock()
>     (Python) File "/usr/lib/python3.12/threading.py", line 1169, in 
> _wait_for_tstate_lock
>         if lock.acquire(block, timeout):
> Traceback for thread 165239 (python3.12) [] (most recent call last):
>     (Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
>         self._bootstrap_inner()
>     (Python) File "/usr/lib/python3.12/threading.py", line 1075, in 
> _bootstrap_inner
>         self.run()
>     (Python) File "/usr/lib/python3.12/threading.py", line 1012, in run
>         self._target(*self._args, **self._kwargs)
>     (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 
> 93, in _worker
>         work_item.run()
>     (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 
> 59, in run
>         result = self.fn(*self.args, **self.kwargs)
>     (Python) File 
> "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 211, 
> in target
>         self._stub.ReleaseExecute(request, metadata=self._metadata)
>     (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py", 
> line 1163, in __call__
>         state, call = self._blocking(
>     (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py", 
> line 1150, in _blocking
>         event = call.next_event()
>     (Python) File "/usr/lib/python3.12/threading.py", line 300, in __enter__
>         return self._lock.__enter__()
> Traceback for thread 165221 (python3.12) [] (most recent call last):
>     (Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
>         self._bootstrap_inner()
>     (Python) File "/usr/lib/python3.12/threading.py", line 1075, in 
> _bootstrap_inner
>         self.run()
>     (Python) File "/usr/lib/python3.12/threading.py", line 1012, in run
>         self._target(*self._args, **self._kwargs)
>     (Python) File "/usr/lib/python3.12/socketserver.py", line 235, in 
> serve_forever
>         ready = selector.select(poll_interval)
>     (Python) File "/usr/lib/python3.12/selectors.py", line 415, in select
>         fd_event_list = self._selector.poll(timeout)
> Traceback for thread 164693 (python3.12) [] (most recent call last):
>     (Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
>         self._bootstrap_inner()
>     (Python) File "/usr/lib/python3.12/threading.py", line 1075, in 
> _bootstrap_inner
>         self.run()
>     (Python) File 
> "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", 
> line 58, in run
> Traceback for thread 164627 (python3.12) [] (most recent call last):
>     (Python) File "<frozen runpy>", line 198, in _run_module_as_main
>     (Python) File "<frozen runpy>", line 88, in _run_code
>     (Python) File 
> "/__w/spark/spark/python/pyspark/sql/tests/connect/test_connect_session.py", 
> line 331, in <module>
>         main()
>     (Python) File "/__w/spark/spark/python/pyspark/testing/unittestutils.py", 
> line 38, in main
>         res = unittest.main(module=module, testRunner=testRunner, 
> verbosity=2, exit=False)
>     (Python) File "/usr/lib/python3.12/unittest/main.py", line 105, in 
> __init__
>         self.runTests()
>     (Python) File "/usr/lib/python3.12/unittest/main.py", line 281, in 
> runTests
>         self.result = testRunner.run(self.test)
>     (Python) File 
> "/usr/local/lib/python3.12/dist-packages/xmlrunner/runner.py", line 67, in run
>         test(result)
>     (Python) File "/usr/lib/python3.12/unittest/suite.py", line 84, in 
> __call__
>         return self.run(*args, **kwds)
>     (Python) File "/usr/lib/python3.12/unittest/suite.py", line 122, in run
>         test(result)
>     (Python) File "/usr/lib/python3.12/unittest/suite.py", line 84, in 
> __call__
>         return self.run(*args, **kwds)
>     (Python) File "/usr/lib/python3.12/unittest/suite.py", line 122, in run
>         test(result)
>     (Python) File "/usr/lib/python3.12/unittest/case.py", line 690, in 
> __call__
>         return self.run(*args, **kwds)
>     (Python) File "/usr/lib/python3.12/unittest/case.py", line 634, in run
>         self._callTestMethod(testMethod)
>     (Python) File "/usr/lib/python3.12/unittest/case.py", line 589, in 
> _callTestMethod
>         if method() is not None:
>     (Python) File 
> "/__w/spark/spark/python/pyspark/sql/tests/connect/test_connect_session.py", 
> line 139, in test_error_enrichment_jvm_stacktrace
>         ).collect()
>     (Python) File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", 
> line 1826, in collect
>         table, schema = self._to_table()
>     (Python) File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", 
> line 1846, in _to_table
>         table, schema, self._execution_info = self._session.client.to_table(
>     (Python) File 
> "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1038, in 
> to_table
>         table, schema, metrics, observed_metrics, _ = 
> self._execute_and_fetch(req, observations)
>     (Python) File 
> "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1725, in 
> _execute_and_fetch
>         for response in self._execute_and_fetch_as_iterator(
>     (Python) File 
> "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1682, in 
> _execute_and_fetch_as_iterator
>         generator = ExecutePlanResponseReattachableIterator(
>     (Python) File 
> "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 127, 
> in __init__
>         self._stub.ExecutePlan(self._initial_request, metadata=metadata)
>     (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py", 
> line 1396, in __call__
>         call = self._managed_call(
>     (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py", 
> line 1785, in create
>         call = state.channel.integrated_call(
>     (Python) File "/usr/lib/python3.12/threading.py", line 300, in __enter__
>         return self._lock.__enter__()
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to