Script 'mail_helper' called by obssrc
Hello community,

here is the log from the commit of package python-distributed for 
openSUSE:Factory checked in at 2022-02-21 20:51:31
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-distributed (Old)
 and      /work/SRC/openSUSE:Factory/.python-distributed.new.1958 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-distributed"

Mon Feb 21 20:51:31 2022 rev:54 rq:956516 version:2022.1.1

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-distributed/python-distributed.changes    
2022-02-02 22:45:01.130065708 +0100
+++ 
/work/SRC/openSUSE:Factory/.python-distributed.new.1958/python-distributed.changes
  2022-02-21 20:51:33.286296309 +0100
@@ -1,0 +2,7 @@
+Mon Feb 14 15:12:44 UTC 2022 - Matej Cepl <mc...@suse.com>
+
+- Add 5709-avoid-deadlock-ActorFuture.patch to avoid deadlock in
+  ActorFuture (gh#dask/distributed#5709).
+
+-------------------------------------------------------------------
+

New:
----
  5709-avoid-deadlock-ActorFuture.patch

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-distributed.spec ++++++
--- /var/tmp/diff_new_pack.wRPaTq/_old  2022-02-21 20:51:33.910296579 +0100
+++ /var/tmp/diff_new_pack.wRPaTq/_new  2022-02-21 20:51:33.914296581 +0100
@@ -65,6 +65,9 @@
 URL:            https://distributed.readthedocs.io/en/latest/
 Source:         
https://github.com/dask/distributed/archive/refs/tags//%{ghversiontag}.tar.gz#/distributed-%{ghversiontag}-gh.tar.gz
 Source99:       python-distributed-rpmlintrc
+# PATCH-FIX-UPSTREAM 5709-avoid-deadlock-ActorFuture.patch 
gh#dask/distributed#5709 mc...@suse.com
+# avoid deadlock in ActorFuture
+Patch0:         5709-avoid-deadlock-ActorFuture.patch
 BuildRequires:  %{python_module base >= 3.7}
 BuildRequires:  %{python_module setuptools}
 BuildRequires:  fdupes
@@ -123,6 +126,7 @@
 
 %prep
 %autosetup -p1 -n distributed-%{ghversiontag}
+
 sed -i  '/addopts/ {s/--durations=20//; s/--color=yes//}' setup.cfg
 
 %build

++++++ 5709-avoid-deadlock-ActorFuture.patch ++++++
>From 9f54f73be83fd0b3e897d0c077f3132000c57336 Mon Sep 17 00:00:00 2001
From: Thomas Grainger <tagr...@gmail.com>
Date: Wed, 26 Jan 2022 15:30:16 +0000
Subject: [PATCH 01/10] avoid deadlock in ActorFuture

fixes #5708
fixes #5350
---
 distributed/__init__.py         |    2 
 distributed/actor.py            |  161 ++++++++++++++++++++++++++++++----------
 distributed/client.py           |    4 
 distributed/tests/test_actor.py |   95 +++++++++++++++++++----
 docs/source/actors.rst          |    8 -
 5 files changed, 209 insertions(+), 61 deletions(-)

--- a/distributed/__init__.py
+++ b/distributed/__init__.py
@@ -4,7 +4,7 @@ import dask
 from dask.config import config  # type: ignore
 
 from ._version import get_versions
-from .actor import Actor, ActorFuture
+from .actor import Actor, BaseActorFuture
 from .client import (
     Client,
     CompatibleExecutor,
--- a/distributed/actor.py
+++ b/distributed/actor.py
@@ -1,6 +1,15 @@
+from __future__ import annotations
+
+import abc
 import asyncio
 import functools
+import sys
 import threading
+from dataclasses import dataclass
+from datetime import timedelta
+from typing import TYPE_CHECKING, Generic, NoReturn, TypeVar
+
+from tornado.ioloop import IOLoop
 
 from .client import Future
 from .protocol import to_serialize
@@ -8,13 +17,52 @@ from .utils import iscoroutinefunction,
 from .utils_comm import WrappedKey
 from .worker import get_client, get_worker
 
+_T = TypeVar("_T")
+
+if sys.version_info >= (3, 9):
+    from collections.abc import Awaitable, Generator
+else:
+    from typing import Awaitable, Generator
+
+if sys.version_info >= (3, 8):
+    from typing import Literal
+elif TYPE_CHECKING:
+    from typing_extensions import Literal
+
+if sys.version_info >= (3, 10):
+    from asyncio import Event as _LateLoopEvent
+else:
+    # In python 3.10 asyncio.Lock and other primitives no longer support
+    # passing a loop kwarg to bind to a loop running in another thread
+    # e.g. calling from Client(asynchronous=False). Instead the loop is bound
+    # as late as possible: when calling any methods that wait on or wake
+    # Future instances. See: https://bugs.python.org/issue42392
+    class _LateLoopEvent:
+        def __init__(self) -> None:
+            self._event: asyncio.Event | None = None
+
+        def set(self) -> None:
+            if self._event is None:
+                self._event = asyncio.Event()
+
+            self._event.set()
+
+        def is_set(self) -> bool:
+            return self._event is not None and self._event.is_set()
+
+        async def wait(self) -> bool:
+            if self._event is None:
+                self._event = asyncio.Event()
+
+            return await self._event.wait()
+
 
 class Actor(WrappedKey):
     """Controls an object on a remote worker
 
     An actor allows remote control of a stateful object living on a remote
     worker.  Method calls on this object trigger operations on the remote
-    object and return ActorFutures on which we can block to get results.
+    object and return BaseActorFutures on which we can block to get results.
 
     Examples
     --------
@@ -36,7 +84,7 @@ class Actor(WrappedKey):
     >>> counter
     <Actor: Counter, key=Counter-1234abcd>
 
-    Calling methods on this object immediately returns deferred ``ActorFuture``
+    Calling methods on this object immediately returns deferred 
``BaseActorFuture``
     objects.  You can call ``.result()`` on these objects to block and get the
     result of the function call.
 
@@ -140,9 +188,7 @@ class Actor(WrappedKey):
                 return attr
 
             elif callable(attr):
-                return lambda *args, **kwargs: ActorFuture(
-                    None, self._io_loop, result=attr(*args, **kwargs)
-                )
+                return lambda *args, **kwargs: EagerActorFuture(attr(*args, 
**kwargs))
             else:
                 return attr
 
@@ -166,16 +212,17 @@ class Actor(WrappedKey):
                             return await run_actor_function_on_worker()
                         else:
                             raise OSError("Unable to contact Actor's worker")
-                    return result
+                    if result["status"] == "OK":
+                        return _OK(result["result"])
+                    return _Error(result["exception"])
 
-                q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
+                actor_future = ActorFuture(io_loop=self._io_loop)
 
                 async def wait_then_add_to_queue():
-                    x = await run_actor_function_on_worker()
-                    await q.put(x)
+                    actor_future._set_result(await 
run_actor_function_on_worker())
 
                 self._io_loop.add_callback(wait_then_add_to_queue)
-                return ActorFuture(q, self._io_loop)
+                return actor_future
 
             return func
 
@@ -215,10 +262,10 @@ class ProxyRPC:
         return func
 
 
-class ActorFuture:
+class BaseActorFuture(abc.ABC, Awaitable[_T]):
     """Future to an actor's method call
 
-    Whenever you call a method on an Actor you get an ActorFuture immediately
+    Whenever you call a method on an Actor you get a BaseActorFuture 
immediately
     while the computation happens in the background.  You can call ``.result``
     to block and collect the full result
 
@@ -227,34 +274,72 @@ class ActorFuture:
     Actor
     """
 
-    def __init__(self, q, io_loop, result=None):
-        self.q = q
-        self.io_loop = io_loop
-        if result:
-            self._cached_result = result
-        self.status = "pending"
+    @abc.abstractmethod
+    def result(self, timeout: str | timedelta | float | None = None) -> _T:
+        ...
+
+    @abc.abstractmethod
+    def done(self) -> bool:
+        ...
 
-    def __await__(self):
-        return self._result().__await__()
+    def __repr__(self) -> Literal["<ActorFuture>"]:
+        return "<ActorFuture>"
 
-    def done(self):
-        return self.status != "pending"
 
-    async def _result(self, raiseit=True):
-        if not hasattr(self, "_cached_result"):
-            out = await self.q.get()
-            if out["status"] == "OK":
-                self.status = "finished"
-                self._cached_result = out["result"]
-            else:
-                self.status = "error"
-                self._cached_result = out["exception"]
-        if self.status == "error":
-            raise self._cached_result
-        return self._cached_result
+@dataclass(frozen=True, eq=False)
+class EagerActorFuture(BaseActorFuture[_T]):
+    """Future to an actor's method call when an actor calls another actor on 
the same worker"""
 
-    def result(self, timeout=None):
-        return sync(self.io_loop, self._result, callback_timeout=timeout)
+    _result: _T
 
-    def __repr__(self):
-        return "<ActorFuture>"
+    def __await__(self) -> Generator[object, None, _T]:
+        return self._result
+        yield
+
+    def result(self, timeout: object = None) -> _T:
+        return self._result
+
+    def done(self) -> Literal[True]:
+        return True
+
+
+@dataclass(frozen=True, eq=False)
+class _OK(Generic[_T]):
+    _v: _T
+
+    def unwrap(self) -> _T:
+        return self._v
+
+
+@dataclass(frozen=True, eq=False)
+class _Error:
+    _e: Exception
+
+    def unwrap(self) -> NoReturn:
+        raise self._e
+
+
+class ActorFuture(BaseActorFuture[_T]):
+    def __init__(self, io_loop: IOLoop):
+        self._io_loop = io_loop
+        self._event = _LateLoopEvent()
+        self._out: _Error | _OK[_T] | None = None
+
+    def __await__(self) -> Generator[object, None, _T]:
+        return self._result().__await__()
+
+    def done(self) -> bool:
+        return self._event.is_set()
+
+    async def _result(self) -> _T:
+        await self._event.wait()
+        out = self._out
+        assert out is not None
+        return out.unwrap()
+
+    def _set_result(self, out: _Error | _OK[_T]) -> None:
+        self._out = out
+        self._event.set()
+
+    def result(self, timeout: str | timedelta | float | None = None) -> _T:
+        return sync(self._io_loop, self._result, callback_timeout=timeout)
--- a/distributed/client.py
+++ b/distributed/client.py
@@ -4943,11 +4943,11 @@ class as_completed:
         """Add multiple futures to the collection.
 
         The added futures will emit from the iterator once they finish"""
-        from .actor import ActorFuture
+        from .actor import BaseActorFuture
 
         with self.lock:
             for f in futures:
-                if not isinstance(f, (Future, ActorFuture)):
+                if not isinstance(f, (Future, BaseActorFuture)):
                     raise TypeError("Input must be a future, got %s" % f)
                 self.futures[f] += 1
                 self.loop.add_callback(self._track_future, f)
--- a/distributed/tests/test_actor.py
+++ b/distributed/tests/test_actor.py
@@ -8,7 +8,7 @@ import dask
 
 from distributed import (
     Actor,
-    ActorFuture,
+    BaseActorFuture,
     Client,
     Future,
     Nanny,
@@ -16,6 +16,7 @@ from distributed import (
     get_client,
     wait,
 )
+from distributed.actor import _LateLoopEvent
 from distributed.metrics import time
 from distributed.utils_test import cluster, gen_cluster
 
@@ -39,16 +40,6 @@ class Counter:
         return self.n
 
 
-class UsesCounter:
-    # An actor whose method argument is another actor
-
-    def do_inc(self, ac):
-        return ac.increment().result()
-
-    async def ado_inc(self, ac):
-        return await ac.ainc()
-
-
 class List:
     L: list = []
 
@@ -113,7 +104,7 @@ async def test_worker_actions(c, s, a, b
         assert counter._address == a_address
 
         future = counter.increment(separate_thread=separate_thread)
-        assert isinstance(future, ActorFuture)
+        assert isinstance(future, BaseActorFuture)
         assert "Future" in type(future).__name__
         end = future.result(timeout=1)
         assert end > start
@@ -266,6 +257,27 @@ def test_sync(client):
     assert "distributed.actor" not in repr(future)
 
 
+def test_timeout(client):
+    class Waiter:
+        def __init__(self):
+            self.event = _LateLoopEvent()
+
+        async def set(self):
+            self.event.set()
+
+        async def wait(self):
+            return await self.event.wait()
+
+    event = client.submit(Waiter, actor=True).result()
+    future = event.wait()
+
+    with pytest.raises(asyncio.TimeoutError):
+        future.result(timeout="0.001s")
+
+    event.set().result()
+    assert future.result() is True
+
+
 @gen_cluster(client=True, config={"distributed.comm.timeouts.connect": "1s"})
 async def test_failed_worker(c, s, a, b):
     future = c.submit(Counter, actor=True, workers=[a.address])
@@ -538,11 +550,9 @@ async def test_actors_in_profile(c, s, a
 
 @gen_cluster(client=True)
 async def test_waiter(c, s, a, b):
-    from tornado.locks import Event
-
     class Waiter:
         def __init__(self):
-            self.event = Event()
+            self.event = _LateLoopEvent()
 
         async def set(self):
             self.event.set()
@@ -618,6 +628,42 @@ def test_worker_actor_handle_is_weakref_
 
 
 def test_one_thread_deadlock():
+    class UsesCounter:
+        # An actor whose method argument is another actor
+
+        def do_inc(self, ac):
+            return ac.increment().result()
+
+    with cluster(nworkers=2) as (cl, w):
+        client = Client(cl["address"])
+        ac = client.submit(Counter, actor=True).result()
+        ac2 = client.submit(UsesCounter, actor=True, 
workers=[ac._address]).result()
+
+        assert ac2.do_inc(ac).result() == 1
+
+
+def test_one_thread_deadlock_timeout():
+    class UsesCounter:
+        # An actor whose method argument is another actor
+
+        def do_inc(self, ac):
+            return ac.increment().result(timeout=1)
+
+    with cluster(nworkers=2) as (cl, w):
+        client = Client(cl["address"])
+        ac = client.submit(Counter, actor=True).result()
+        ac2 = client.submit(UsesCounter, actor=True, 
workers=[ac._address]).result()
+
+        assert ac2.do_inc(ac).result() == 1
+
+
+def test_one_thread_deadlock_sync_client():
+    class UsesCounter:
+        # An actor whose method argument is another actor
+
+        def do_inc(self, ac):
+            return get_client().sync(ac.increment)
+
     with cluster(nworkers=2) as (cl, w):
         client = Client(cl["address"])
         ac = client.submit(Counter, actor=True).result()
@@ -628,6 +674,12 @@ def test_one_thread_deadlock():
 
 @gen_cluster(client=True)
 async def test_async_deadlock(client, s, a, b):
+    class UsesCounter:
+        # An actor whose method argument is another actor
+
+        async def ado_inc(self, ac):
+            return await ac.ainc()
+
     ac = await client.submit(Counter, actor=True)
     ac2 = await client.submit(UsesCounter, actor=True, workers=[ac._address])
 
@@ -698,7 +750,7 @@ async def test_actor_future_awaitable(cl
     ac = await client.submit(Counter, actor=True)
     futures = [ac.increment() for _ in range(10)]
 
-    assert all([isinstance(future, ActorFuture) for future in futures])
+    assert all([isinstance(future, BaseActorFuture) for future in futures])
 
     out = await asyncio.gather(*futures)
     assert all([future.done() for future in futures])
@@ -706,6 +758,17 @@ async def test_actor_future_awaitable(cl
 
 
 @gen_cluster(client=True)
+async def test_actor_future_awaitable_deadlock(client, s, a, b):
+    ac = await client.submit(Counter, actor=True)
+    f = ac.increment()
+
+    async def coro():
+        return await f
+
+    assert await asyncio.gather(coro(), coro()) == [1, 1]
+
+
+@gen_cluster(client=True)
 async def test_serialize_with_pickle(c, s, a, b):
     class Foo:
         def __init__(self):
--- a/docs/source/actors.rst
+++ b/docs/source/actors.rst
@@ -115,15 +115,15 @@ However accessing an attribute or callin
 to the remote worker, run the method on the remote worker in a separate thread
 pool, and then communicate the result back to the calling side.  For attribute
 access these operations block and return when finished, for method calls they
-return an ``ActorFuture`` immediately.
+return an ``BaseActorFuture`` immediately.
 
 .. code-block:: python
 
-   >>> future = counter.increment()  # Immediately returns an ActorFuture
+   >>> future = counter.increment()  # Immediately returns a BaseActorFuture
    >>> future.result()               # Block until finished and result arrives
    1
 
-``ActorFuture`` are similar to normal Dask ``Future`` objects, but not as fully
+``BaseActorFuture`` are similar to normal Dask ``Future`` objects, but not as 
fully
 featured.  They curently *only* support the ``result`` method and nothing else.
 They don't currently work with any other Dask functions that expect futures,
 like ``as_completed``, ``wait``, or ``client.gather``.  They can't be placed
@@ -167,7 +167,7 @@ workers have only a single thread for ac
 future.
 
 The result is sent back immediately to the calling side, and is not stored on
-the worker with the actor.  It is cached on the ``ActorFuture`` object.
+the worker with the actor.  It is cached on the ``BaseActorFuture`` object.
 
 
 Calling from coroutines and async/await

Reply via email to