Tian Gao created SPARK-55362:
--------------------------------

             Summary: Connect client can deadlock when it shutdown from thread 
pool
                 Key: SPARK-55362
                 URL: https://issues.apache.org/jira/browse/SPARK-55362
             Project: Spark
          Issue Type: Bug
          Components: Connect, PySpark
    Affects Versions: 4.2.0
            Reporter: Tian Gao


`__del__` could be triggered in any thread which can shutdown the threadpool 
inside the pool itself - causing deadlock.

{code:python}
Traceback for thread 165248 (python3.12) [] (most recent call last):
    (Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/lib/python3.12/threading.py", line 1075, in 
_bootstrap_inner
        self.run()
    (Python) File "/usr/lib/python3.12/threading.py", line 1012, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 93, 
in _worker
        work_item.run()
    (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 59, 
in run
        result = self.fn(*self.args, **self.kwargs)
    (Python) File 
"/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 238, in 
target
        self._stub.ReleaseExecute(request, metadata=self._metadata)
    (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py", 
line 1163, in __call__
        state, call = self._blocking(
    (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py", 
line 1134, in _blocking
        call = self._channel.segregated_call(
    (Python) File "/usr/lib/python3.12/threading.py", line 942, in __init__
        _dangling.add(self)
    (Python) File "/usr/lib/python3.12/_weakrefset.py", line 85, in add
        def add(self, item):
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/session.py", 
line 872, in __del__
        self.client.close()
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", 
line 1275, in close
        ExecutePlanResponseReattachableIterator.shutdown()
    (Python) File 
"/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 84, in 
shutdown
        thread_pool.shutdown()
    (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 239, 
in shutdown
        t.join()
    (Python) File "/usr/lib/python3.12/threading.py", line 1149, in join
        self._wait_for_tstate_lock()
    (Python) File "/usr/lib/python3.12/threading.py", line 1169, in 
_wait_for_tstate_lock
        if lock.acquire(block, timeout):

Traceback for thread 165239 (python3.12) [] (most recent call last):
    (Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/lib/python3.12/threading.py", line 1075, in 
_bootstrap_inner
        self.run()
    (Python) File "/usr/lib/python3.12/threading.py", line 1012, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 93, 
in _worker
        work_item.run()
    (Python) File "/usr/lib/python3.12/concurrent/futures/thread.py", line 59, 
in run
        result = self.fn(*self.args, **self.kwargs)
    (Python) File 
"/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 211, in 
target
        self._stub.ReleaseExecute(request, metadata=self._metadata)
    (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py", 
line 1163, in __call__
        state, call = self._blocking(
    (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py", 
line 1150, in _blocking
        event = call.next_event()
    (Python) File "/usr/lib/python3.12/threading.py", line 300, in __enter__
        return self._lock.__enter__()

Traceback for thread 165221 (python3.12) [] (most recent call last):
    (Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/lib/python3.12/threading.py", line 1075, in 
_bootstrap_inner
        self.run()
    (Python) File "/usr/lib/python3.12/threading.py", line 1012, in run
        self._target(*self._args, **self._kwargs)
    (Python) File "/usr/lib/python3.12/socketserver.py", line 235, in 
serve_forever
        ready = selector.select(poll_interval)
    (Python) File "/usr/lib/python3.12/selectors.py", line 415, in select
        fd_event_list = self._selector.poll(timeout)

Traceback for thread 164693 (python3.12) [] (most recent call last):
    (Python) File "/usr/lib/python3.12/threading.py", line 1032, in _bootstrap
        self._bootstrap_inner()
    (Python) File "/usr/lib/python3.12/threading.py", line 1075, in 
_bootstrap_inner
        self.run()
    (Python) File 
"/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 
58, in run

Traceback for thread 164627 (python3.12) [] (most recent call last):
    (Python) File "<frozen runpy>", line 198, in _run_module_as_main
    (Python) File "<frozen runpy>", line 88, in _run_code
    (Python) File 
"/__w/spark/spark/python/pyspark/sql/tests/connect/test_connect_session.py", 
line 331, in <module>
        main()
    (Python) File "/__w/spark/spark/python/pyspark/testing/unittestutils.py", 
line 38, in main
        res = unittest.main(module=module, testRunner=testRunner, verbosity=2, 
exit=False)
    (Python) File "/usr/lib/python3.12/unittest/main.py", line 105, in __init__
        self.runTests()
    (Python) File "/usr/lib/python3.12/unittest/main.py", line 281, in runTests
        self.result = testRunner.run(self.test)
    (Python) File 
"/usr/local/lib/python3.12/dist-packages/xmlrunner/runner.py", line 67, in run
        test(result)
    (Python) File "/usr/lib/python3.12/unittest/suite.py", line 84, in __call__
        return self.run(*args, **kwds)
    (Python) File "/usr/lib/python3.12/unittest/suite.py", line 122, in run
        test(result)
    (Python) File "/usr/lib/python3.12/unittest/suite.py", line 84, in __call__
        return self.run(*args, **kwds)
    (Python) File "/usr/lib/python3.12/unittest/suite.py", line 122, in run
        test(result)
    (Python) File "/usr/lib/python3.12/unittest/case.py", line 690, in __call__
        return self.run(*args, **kwds)
    (Python) File "/usr/lib/python3.12/unittest/case.py", line 634, in run
        self._callTestMethod(testMethod)
    (Python) File "/usr/lib/python3.12/unittest/case.py", line 589, in 
_callTestMethod
        if method() is not None:
    (Python) File 
"/__w/spark/spark/python/pyspark/sql/tests/connect/test_connect_session.py", 
line 139, in test_error_enrichment_jvm_stacktrace
        ).collect()
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", 
line 1826, in collect
        table, schema = self._to_table()
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", 
line 1846, in _to_table
        table, schema, self._execution_info = self._session.client.to_table(
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", 
line 1038, in to_table
        table, schema, metrics, observed_metrics, _ = 
self._execute_and_fetch(req, observations)
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", 
line 1725, in _execute_and_fetch
        for response in self._execute_and_fetch_as_iterator(
    (Python) File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", 
line 1682, in _execute_and_fetch_as_iterator
        generator = ExecutePlanResponseReattachableIterator(
    (Python) File 
"/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 127, in 
__init__
        self._stub.ExecutePlan(self._initial_request, metadata=metadata)
    (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py", 
line 1396, in __call__
        call = self._managed_call(
    (Python) File "/usr/local/lib/python3.12/dist-packages/grpc/_channel.py", 
line 1785, in create
        call = state.channel.integrated_call(
    (Python) File "/usr/lib/python3.12/threading.py", line 300, in __enter__
        return self._lock.__enter__()
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to